1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.Utils import unused, getExitMessage
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37 from Products.ZenEvents.ZenEventClasses import Clear, Error
38 from Products.ZenRRD.CommandParser import ParsedResults
39
40 from Products.DataCollector import Plugins
41 unused(Plugins)
42 MAX_CONNECTIONS = 256
43
44
45
47 """
48 Error for a defered call taking too long to complete
49 """
50
52 Exception.__init__(self)
53 self.args = args
54
55
56 -def Timeout(deferred, seconds, obj):
57 "Cause an error on a deferred when it is taking too long to complete"
58
59 def _timeout(deferred, obj):
60 "took too long... call an errback"
61 deferred.errback(failure.Failure(TimeoutError(obj)))
62
63
64 def _cb(arg, timer):
65 "the command finished, possibly by timing out"
66 if not timer.called:
67 timer.cancel()
68 return arg
69
70
71 timer = reactor.callLater(seconds, _timeout, deferred, obj)
72 deferred.addBoth(_cb, timer)
73 return deferred
74
75
77 """
78 Provide deferred process execution
79 """
80 stopped = None
81 exitCode = None
82 output = ''
83
85 "Kick off the process: run it local"
86 log.debug('running %r' % cmd.command)
87
88 shell = '/bin/sh'
89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
90 self.command = ' '.join(self.cmdline)
91 log.debug('cmd line: %r' % self.command)
92 reactor.spawnProcess(self, shell, self.cmdline, env=None)
93
94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
95 self.stopped = d
96 self.stopped.addErrback(self.timeout)
97 return d
98
100 "Kill a process if it takes too long"
101 try:
102 self.transport.signalProcess('KILL')
103 except error.ProcessExitedAlready:
104 log.debug("Command already exited: %s" % self.command)
105 return value
106
107
109 "Store up the output as it arrives from the process"
110 self.output += data
111
112
124
125
127 """
128 Connection to SSH server at the remote device
129 """
130
134
141
142
144 "Forward the results of the command execution to the starter"
145
146
147 d = self.defers.pop(command, None)
148 if d is None:
149 log.error("Internal error where deferred object not in dictionary." \
150 " Command = '%s' Data = '%s' Code = '%s'",
151 command, data, code)
152 elif not d.called:
153 d.callback((data, code))
154
155
156 - def check(self, ip, timeout=2):
157 "Turn off blocking SshClient.test method"
158 return True
159
160
162 "We don't need to track commands/results when they complete"
163 SshClient.clientFinished(self)
164 self.cmdmap = {}
165 self._commands = []
166 self.results = []
167
168
170 """
171 Cache all the Ssh connections so they can be managed
172 """
173
175 self.pool = {}
176 self.eventSender = None
177
178 - def get(self, cmd):
179 "Make a new SSH connection if there isn't one available"
180 dc = cmd.deviceConfig
181 result = self.pool.get(dc.device, None)
182 if result is None:
183 log.debug("Creating connection to %s", dc.device)
184 options = Options(dc.username, dc.password,
185 dc.loginTimeout, dc.commandTimeout,
186 dc.keyPath, dc.concurrentSessions)
187
188 result = MySshClient(dc.device, dc.ipAddress, dc.port,
189 options=options)
190 if self.eventSender is not None:
191 result.sendEvent = self.eventSender.sendEvent
192 result.run()
193 self.pool[dc.device] = result
194 return result
195
199
208
209
211 "symetric close that matches get() method"
212 self._close(cmd.deviceConfig.device)
213
214
216 "reduce the number of connections using the schedule for guidance"
217
218 devices = []
219 for c in schedule:
220 device = c.deviceConfig.device
221 if device not in devices:
222 devices.append(device)
223
224 while devices and len(self.pool) > MAX_CONNECTIONS:
225 self._close(devices.pop())
226
227
229 """
230 Run a single command across a cached SSH connection
231 """
232 exitCode = None
233 output = None
234
237
238
254
255
257 "Deal with slow executing command/connection (close it)"
258 cmd, = arg.value.args
259
260
261 self.pool.close(cmd)
262 return arg
263
264
266 "Deliver ourselves to the starter with the proper attributes"
267 if isinstance(value, failure.Failure):
268 return value
269 self.output, self.exitCode = value
270 return self
271
272
283 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
284
285
300
301 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
302
303 -class Cmd(pb.Copyable, pb.RemoteCopy):
304 """
305 Holds the config of every command to be run
306 """
307 command = None
308 useSsh = False
309 cycleTime = None
310 eventClass = None
311 eventKey = None
312 severity = 3
313 lastStart = 0
314 lastStop = 0
315 result = None
316
317
320
323
324
326 cmd, args = (self.command + ' ').split(' ', 1)
327 cmd = cmd.split('/')[-1]
328 return '%s %s' % (cmd, args)
329
330
335
336
347
348
366
367
381
385
387 "Provide a value that establishes the uniqueness of this command"
388 return '%'.join(map(str, [self.useSsh, self.cycleTime,
389 self.severity, self.command]))
390
391 pb.setUnjellyableForClass(Cmd, Cmd)
392
406
407
409
410 - def __init__(self, config, scheduledCmds, sendEvent):
411 """
412 config - one of the configurations that was returned by calling
413 getDataSourceCommands on zenhub.
414 scheduledCmds - a dictionary that maps (device, command) to Cmd object
415 for the commands currently on zencommand's schedule.
416 sendEvent - a function that sends events to zenhub using twisted
417 perspective broker.
418 """
419 self._config = config
420 self._scheduledCmds = scheduledCmds
421 self._sendEvent = sendEvent
422 self._device = config.device
423 self._suppressed = []
424 self._summary = 'zCommandUsername is not set'
425
435
437 """
438 Warn that the username is not set for device and the SSH command cannot be
439 executed.
440 """
441 if self._device not in self._suppressed:
442 log.warning(self._summary + ' for %s' % self._device)
443 self._sendUsernameEvent(Error)
444 msg = 'username not configured for %s. skipping %s.'
445 log.debug(msg % (self._device, command))
446
448 """
449 Go through the Cmd objects in config.commands and update the config on
450 the command with config. If currentDict has the command on it then use
451 that one, otherwise use the command from config. If the device does not have a
452 username set, then don't yield commands that use SSH.
453 """
454 for cmd in self._config.commands:
455 key = (self._device, cmd.command)
456 if cmd.useSsh:
457 if self._config.username:
458 self._sendUsernameEvent(Clear)
459 else:
460 self._warnUsernameNotSet(cmd.command)
461 if self._device not in self._suppressed:
462 self._suppressed.append(self._device)
463 if key in self._scheduledCmds:
464 del self._scheduledCmds[key]
465 continue
466 if key in self._scheduledCmds:
467 newCmd = self._scheduledCmds.pop(key)
468 else:
469 newCmd = cmd
470 yield newCmd.updateConfig(cmd, self._config)
471 self._suppressed = []
472
473
475 """
476 Daemon code to schedule commands and run them.
477 """
478
479 initialServices = RRDDaemon.initialServices + ['CommandConfig']
480
482 RRDDaemon.__init__(self, 'zencommand')
483 self.schedule = []
484 self.timeout = None
485 self.pool = SshPool()
486 self.pool.eventSender = self
487 self.executed = 0
488
490 self.log.debug("zenhub has asked us to delete device %s" % doomed)
491 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
492
497
499 self.log.debug("zenhub sent updated device list %s" % devices)
500 updated = []
501 lastChanges = dict(devices)
502 keep = []
503 for cmd in self.schedule:
504 if cmd.deviceConfig.device in lastChanges:
505 if cmd.lastChange > lastChanges[cmd.device]:
506 updated.append(cmd.deviceConfig.device)
507 keep.append(cmd)
508 else:
509 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
510 self.schedule = keep
511 if updated:
512 self.log.info("Fetching the config for %s", updated)
513 d = self.model().callRemote('getDataSourceCommands', devices)
514 d.addCallback(self.updateConfig, updated)
515 d.addErrback(self.error)
516
535
554
555
557 """
558 Run through the schedule and start anything that needs to be done.
559 Set a timer if we have nothing to do.
560 """
561 if not self.options.cycle:
562 for cmd in self.schedule:
563 if cmd.running() or cmd.lastStart == 0:
564 break
565 else:
566 self.stop()
567 return
568 try:
569 if self.timeout and not self.timeout.called:
570 self.timeout.cancel()
571 self.timeout = None
572 def compare(x, y):
573 return cmp(x.nextRun(), y.nextRun())
574 self.schedule.sort(compare)
575 self.pool.trimConnections(self.schedule)
576 earliest = None
577 running = 0
578 now = time.time()
579 for c in self.schedule:
580 if c.running():
581 running += 1
582
583 for c in self.schedule:
584 if running >= self.options.parallel:
585 break
586 if c.nextRun() <= now:
587 c.start(self.pool).addBoth(self.finished)
588 running += 1
589 else:
590 earliest = c.nextRun() - now
591 break
592
593 if earliest is not None:
594 self.pool.reinitializeCollectorClients()
595 self.log.debug("Next command in %d seconds", int(earliest))
596 self.timeout = reactor.callLater(max(1, earliest),
597 self.processSchedule)
598 except Exception, ex:
599 self.log.exception(ex)
600
601
612
614 """
615 The command has finished. cmdOrErr is either a Cmd instance or a
616 twisted failure.
617 """
618 self.executed += 1
619 if isinstance(cmdOrErr, failure.Failure):
620 self.error(cmdOrErr)
621 else:
622 cmd = cmdOrErr
623 self._handleExitCode(cmd)
624 self.parseResults(cmd)
625 self.processSchedule()
626
637
651
653 """
654 Process the results of our command-line, send events
655 and check datapoints.
656
657 @param cmd: command
658 @type: cmd object
659 """
660 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
661 results = ParsedResults()
662 try:
663 parser = cmd.parser.create()
664 except Exception, ex:
665 self.log.exception("Error loading parser %s" % cmd.parser)
666 import traceback
667 self.sendEvent(dict(device=cmd.deviceConfig.device,
668 summary="Error loading parser %s" % cmd.parser,
669 component="zencommand",
670 message=traceback.format_exc(),
671 agent="zencommand",
672 ))
673 return
674 parser.processResults(cmd, results)
675
676 for ev in results.events:
677 self.sendEvent(ev, device=cmd.deviceConfig.device)
678
679 for dp, value in results.values:
680 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
681 value = self.rrd.save(dp.rrdPath,
682 value,
683 dp.rrdType,
684 dp.rrdCreateCommand,
685 cmd.cycleTime,
686 dp.rrdMin,
687 dp.rrdMax)
688 self.log.debug("RRD save result: %s" % value)
689 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
690 eventKey = cmd.getEventKey(dp)
691 if 'eventKey' in ev:
692 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
693 else:
694 ev['eventKey'] = eventKey
695 ev['component'] = dp.component
696 self.sendEvent(ev)
697
739
740 return drive(doFetchConfig)
741
742
743 - def start(self, driver):
744 """
745 Fetch the configuration and return a deferred for its completion.
746 Also starts the config cycle
747 """
748 ex = None
749 try:
750 self.log.debug('Fetching configuration from zenhub')
751 yield self.fetchConfig()
752 driver.next()
753 self.log.debug('Finished config fetch')
754 except Exception, ex:
755 self.log.exception(ex)
756 driveLater(self.configCycleInterval * 60, self.start)
757 if ex:
758 raise ex
759
766
771
772
773 if __name__ == '__main__':
774 from Products.ZenRRD.zencommand import zencommand
775 z = zencommand()
776 z.run()
777