1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.Utils import unused
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37 from Products.ZenEvents.ZenEventClasses import Clear
38 from Products.ZenRRD.CommandParser import ParsedResults
39
40 from Products.DataCollector import Plugins
41 unused(Plugins)
42 MAX_CONNECTIONS = 256
43
44
45
47 """
48 Error for a defered call taking too long to complete
49 """
50
52 Exception.__init__(self)
53 self.args = args
54
55
56 -def Timeout(deferred, seconds, obj):
57 "Cause an error on a deferred when it is taking too long to complete"
58
59 def _timeout(deferred, obj):
60 "took too long... call an errback"
61 deferred.errback(failure.Failure(TimeoutError(obj)))
62
63
64 def _cb(arg, timer):
65 "the command finished, possibly by timing out"
66 if not timer.called:
67 timer.cancel()
68 return arg
69
70
71 timer = reactor.callLater(seconds, _timeout, deferred, obj)
72 deferred.addBoth(_cb, timer)
73 return deferred
74
75
77 """
78 Provide deferred process execution
79 """
80 stopped = None
81 exitCode = None
82 output = ''
83
85 "Kick off the process: run it local"
86 log.debug('running %r' % cmd.command)
87
88 shell = '/bin/sh'
89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
90 self.command = ' '.join(self.cmdline)
91 log.debug('cmd line: %r' % self.command)
92 reactor.spawnProcess(self, shell, self.cmdline, env=None)
93
94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
95 self.stopped = d
96 self.stopped.addErrback(self.timeout)
97 return d
98
100 "Kill a process if it takes too long"
101 try:
102 self.transport.signalProcess('KILL')
103 except error.ProcessExitedAlready:
104 log.debug("Command already exited: %s" % self.command)
105 return value
106
107
109 "Store up the output as it arrives from the process"
110 self.output += data
111
112
124
125
127 """
128 Connection to SSH server at the remote device
129 """
130
134
141
142
149
150
151 - def check(self, ip, timeout=2):
152 "Turn off blocking SshClient.test method"
153 return True
154
155
157 "We don't need to track commands/results when they complete"
158 SshClient.clientFinished(self)
159 self.commands = []
160 self.results = []
161
162
164 """
165 Cache all the Ssh connections so they can be managed
166 """
167
170
171
172 - def get(self, cmd):
173 "Make a new SSH connection if there isn't one available"
174 dc = cmd.deviceConfig
175 result = self.pool.get(dc.device, None)
176 if result is None:
177 log.debug("Creating connection to %s", dc.device)
178 options = Options(dc.username, dc.password,
179 dc.loginTimeout, dc.commandTimeout,
180 dc.keyPath, dc.concurrentSessions)
181
182 result = MySshClient(dc.device, dc.ipAddress, dc.port,
183 options=options)
184 result.run()
185 self.pool[dc.device] = result
186 return result
187
188
197
198
200 "symetric close that matches get() method"
201 self._close(cmd.deviceConfig.device)
202
203
215
216
218 """
219 Run a single command across a cached SSH connection
220 """
221 exitCode = None
222 output = None
223
226
227
243
244
246 "Deal with slow executing command/connection (close it)"
247 cmd, = arg.value.args
248
249
250 self.pool.close(cmd)
251 return arg
252
253
255 "Deliver ourselves to the starter with the proper attributes"
256 if isinstance(value, failure.Failure):
257 return value
258 self.output, self.exitCode = value
259 return self
260
261
272 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
273
274
289
290 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
291
292 -class Cmd(pb.Copyable, pb.RemoteCopy):
293 """
294 Holds the config of every command to be run
295 """
296 command = None
297 useSsh = False
298 cycleTime = None
299 eventClass = None
300 eventKey = None
301 severity = 3
302 lastStart = 0
303 lastStop = 0
304 result = None
305
306
309
312
313
315 cmd, args = (self.command + ' ').split(' ', 1)
316 cmd = cmd.split('/')[-1]
317 return '%s %s' % (cmd, args)
318
319
324
325
336
337
355
356
367
371
373 "Provide a value that establishes the uniqueness of this command"
374 return '%'.join(map(str, [self.useSsh, self.cycleTime,
375 self.severity, self.command]))
376
377 pb.setUnjellyableForClass(Cmd, Cmd)
378
392
393
410
411
413 """
414 Go through the Cmd objects in config.commands and update the config on the
415 command with config. If currentDict has the command on it then use that
416 one, otherwise use the command from config. If the device does not have a
417 username set, then don't yield commands that use SSH.
418
419 Parameters:
420
421 config - one of the configurations that was returned by calling
422 getDataSourceCommands on zenhub.
423
424 currentDict - a dictionary that maps (device, command) to Cmd object
425 for the commands currently on zencommand's schedule.
426 """
427
428 suppressed = []
429
430 for cmd in config.commands:
431
432 key = (config.device, cmd.command)
433
434 if not config.username and cmd.useSsh:
435 warnUsernameNotSet(config.device, cmd, sendEvent, suppressed)
436 if config.device not in suppressed:
437 suppressed.append(config.device)
438 if key in currentDict:
439 del currentDict[key]
440 continue
441
442 if key in currentDict: newCmd = currentDict.pop(key)
443 else : newCmd = cmd
444
445 yield newCmd.updateConfig(cmd, config)
446
447
449 """
450 Daemon code to schedule commands and run them.
451 """
452
453 initialServices = RRDDaemon.initialServices + ['CommandConfig']
454
461
463 self.log.debug("zenhub has asked us to delete device %s" % doomed)
464 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
465
469
471 self.log.debug("zenhub sent updated device list %s" % devices)
472 updated = []
473 lastChanges = dict(devices)
474 keep = []
475 for cmd in self.schedule:
476 if cmd.deviceConfig.device in lastChanges:
477 if cmd.lastChange > lastChanges[cmd.device]:
478 updated.append(cmd.deviceConfig.device)
479 keep.append(cmd)
480 else:
481 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
482 self.schedule = keep
483 if updated:
484 self.log.info("Fetching the config for %s", updated)
485 d = self.model().callRemote('getDataSourceCommands', devices)
486 d.addCallback(self.updateConfig, updated)
487 d.addErrback(self.error)
488
506
525
526
528 """
529 Run through the schedule and start anything that needs to be done.
530 Set a timer if we have nothing to do.
531 """
532 if not self.options.cycle:
533 for cmd in self.schedule:
534 if cmd.running() or cmd.lastStart == 0:
535 break
536 else:
537 self.stop()
538 return
539 try:
540 if self.timeout and not self.timeout.called:
541 self.timeout.cancel()
542 self.timeout = None
543 def compare(x, y):
544 return cmp(x.nextRun(), y.nextRun())
545 self.schedule.sort(compare)
546 self.pool.trimConnections(self.schedule)
547 earliest = None
548 running = 0
549 now = time.time()
550 for c in self.schedule:
551 if c.running():
552 running += 1
553
554 for c in self.schedule:
555 if running >= self.options.parallel:
556 break
557 if c.nextRun() <= now:
558 c.start(self.pool).addBoth(self.finished)
559 running += 1
560 else:
561 earliest = c.nextRun() - now
562 break
563
564 if earliest is not None:
565 self.log.debug("Next command in %d seconds", int(earliest))
566 self.timeout = reactor.callLater(max(1, earliest),
567 self.processSchedule)
568 except Exception, ex:
569 self.log.exception(ex)
570
571
582
584 """
585 The command has finished. cmdOrErr is either a Cmd instance or a
586 twisted failure.
587 """
588 self.executed += 1
589 if isinstance(cmdOrErr, failure.Failure):
590 self.error(cmdOrErr)
591 else:
592 cmd = cmdOrErr
593 self.sendCmdEvent(cmd, Clear, "Clear")
594 self.parseResults(cmd)
595 self.processSchedule()
596
598 """
599 The finished method indicated that there was a failure. This method
600 is also called by RRDDaemon.errorStop.
601 """
602 if isinstance(err.value, TimeoutError):
603 cmd, = err.value.args
604 msg = "Command timed out on device %s: %r" % (
605 cmd.deviceConfig.device, cmd.command)
606 self.log.warning(msg)
607 self.sendCmdEvent(cmd, cmd.severity, msg)
608 else:
609 self.log.exception(err.value)
610
612 """
613 Process the results of our command-line, send events
614 and check datapoints.
615
616 @param cmd: command
617 @type: cmd object
618 """
619 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
620 results = ParsedResults()
621 try:
622 parser = cmd.parser.create()
623 except Exception, ex:
624 self.log.exception("Error loading parser %s" % cmd.parser)
625 import traceback
626 self.sendEvent(dict(device=cmd.deviceConfig.device,
627 summary="Error loading parser %s" % cmd.parser,
628 component="zencommand",
629 message=traceback.format_exc(),
630 agent="zencommand",
631 ))
632 return
633 parser.processResults(cmd, results)
634
635 for ev in results.events:
636 self.sendEvent(ev, device=cmd.deviceConfig.device)
637
638 for dp, value in results.values:
639 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
640 value = self.rrd.save(dp.rrdPath,
641 value,
642 dp.rrdType,
643 dp.rrdCreateCommand,
644 cmd.cycleTime,
645 dp.rrdMin,
646 dp.rrdMax)
647 self.log.debug("RRD save result: %s" % value)
648 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
649 eventKey = cmd.getEventKey(dp)
650 if 'eventKey' in ev:
651 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
652 else:
653 ev['eventKey'] = eventKey
654 ev['component'] = dp.component
655 self.sendEvent(ev)
656
698
699 return drive(doFetchConfig)
700
701
702 - def start(self, driver):
703 """
704 Fetch the configuration and return a deferred for its completion.
705 Also starts the config cycle
706 """
707 ex = None
708 try:
709 self.log.debug('Fetching configuration from zenhub')
710 yield self.fetchConfig()
711 driver.next()
712 self.log.debug('Finished config fetch')
713 except Exception, ex:
714 self.log.exception(ex)
715 driveLater(self.configCycleInterval * 60, self.start)
716 if ex:
717 raise ex
718
720 RRDDaemon.buildOptions(self)
721
722 self.parser.add_option('--parallel', dest='parallel',
723 default=10, type='int',
724 help="Number of devices to collect at one time")
725
730
731
732 if __name__ == '__main__':
733 from Products.ZenRRD.zencommand import zencommand
734 z = zencommand()
735 z.run()
736