1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37
38 from Products.ZenRRD.CommandParser import getParser, ParsedResults
39
40 MAX_CONNECTIONS = 256
41
42
43
45 """
46 Error for a defered call taking too long to complete
47 """
48
50 Exception.__init__(self)
51 self.args = args
52
53
54 -def Timeout(deferred, seconds, obj):
55 "Cause an error on a deferred when it is taking too long to complete"
56
57 def _timeout(deferred, obj):
58 "took too long... call an errback"
59 deferred.errback(failure.Failure(TimeoutError(obj)))
60
61
62 def _cb(arg, timer):
63 "the command finished, possibly by timing out"
64 if not timer.called:
65 timer.cancel()
66 return arg
67
68
69 timer = reactor.callLater(seconds, _timeout, deferred, obj)
70 deferred.addBoth(_cb, timer)
71 return deferred
72
73
75 """
76 Provide deferred process execution
77 """
78 stopped = None
79 exitCode = None
80 output = ''
81
83 "Kick off the process: run it local"
84 log.debug('running %r' % cmd.command)
85
86 shell = '/bin/sh'
87 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
88 self.command = ' '.join(self.cmdline)
89 log.debug('cmd line: %r' % self.command)
90 reactor.spawnProcess(self, shell, self.cmdline, env=None)
91
92 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
93 self.stopped = d
94 self.stopped.addErrback(self.timeout)
95 return d
96
98 "Kill a process if it takes too long"
99 try:
100 self.transport.signalProcess('KILL')
101 except error.ProcessExitedAlready:
102 log.debug("Command already exited: %s" % self.command)
103 return value
104
105
107 "Store up the output as it arrives from the process"
108 self.output += data
109
110
122
123
125 """
126 Connection to SSH server at the remote device
127 """
128
132
139
140
147
148
149 - def check(self, ip, timeout=2):
150 "Turn off blocking SshClient.test method"
151 return True
152
153
155 "We don't need to track commands/results when they complete"
156 SshClient.clientFinished(self)
157 self.commands = []
158 self.results = []
159
160
162 """
163 Cache all the Ssh connections so they can be managed
164 """
165
168
169
170 - def get(self, cmd):
171 "Make a new SSH connection if there isn't one available"
172 dc = cmd.deviceConfig
173 result = self.pool.get(dc.device, None)
174 if result is None:
175 log.debug("Creating connection to %s", dc.device)
176 options = Options(dc.username, dc.password,
177 dc.loginTimeout, dc.commandTimeout,
178 dc.keyPath, dc.concurrentSessions)
179
180 result = MySshClient(dc.device, dc.ipAddress, dc.port,
181 options=options)
182 result.run()
183 self.pool[dc.device] = result
184 return result
185
186
195
196
198 "symetric close that matches get() method"
199 self._close(cmd.deviceConfig.device)
200
201
213
214
216 """
217 Run a single command across a cached SSH connection
218 """
219 exitCode = None
220 output = None
221
224
225
241
242
244 "Deal with slow executing command/connection (close it)"
245 cmd, = arg.value.args
246
247
248 self.pool.close(cmd)
249 return arg
250
251
253 "Deliver ourselves to the starter with the proper attributes"
254 if isinstance(value, failure.Failure):
255 return value
256 self.output, self.exitCode = value
257 return self
258
259
270 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
271
272
287
288 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
289
290 -class Cmd(pb.Copyable, pb.RemoteCopy):
291 """
292 Holds the config of every command to be run
293 """
294 command = None
295 useSsh = False
296 cycleTime = None
297 eventClass = None
298 eventKey = None
299 severity = 3
300 lastStart = 0
301 lastStop = 0
302 result = None
303
304
307
310
311
313 cmd, args = (self.command + ' ').split(' ', 1)
314 cmd = cmd.split('/')[-1]
315 return '%s %s' % (cmd, args)
316
317
322
323
334
335
346
347
358
362
364 "Provide a value that establishes the uniqueness of this command"
365 return '%'.join(map(str, [self.useSsh, self.cycleTime,
366 self.severity, self.command]))
367
368 pb.setUnjellyableForClass(Cmd, Cmd)
369
383
384
401
402
404 """
405 Go through the Cmd objects in config.commands and update the config on the
406 command with config. If currentDict has the command on it then use that
407 one, otherwise use the command from config. If the device does not have a
408 username set, then don't yield commands that use SSH.
409
410 Parameters:
411
412 config - one of the configurations that was returned by calling
413 getDataSourceCommands on zenhub.
414
415 currentDict - a dictionary that maps (device, command) to Cmd object
416 for the commands currently on zencommand's schedule.
417 """
418
419 suppressed = []
420
421 for cmd in config.commands:
422
423 key = (config.device, cmd.command)
424
425 if not config.username and cmd.useSsh:
426 warnUsernameNotSet(config.device, cmd, sendEvent, suppressed)
427 if config.device not in suppressed:
428 suppressed.append(config.device)
429 if key in currentDict:
430 del currentDict[key]
431 continue
432
433 if key in currentDict: newCmd = currentDict.pop(key)
434 else : newCmd = cmd
435
436 yield newCmd.updateConfig(cmd, config)
437
438
440 """
441 Daemon code to schedule commands and run them.
442 """
443
444 initialServices = RRDDaemon.initialServices + ['CommandConfig']
445
452
454 self.log.debug("zenhub has asked us to delete device %s" % doomed)
455 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
456
460
462 self.log.debug("zenhub sent updated device list %s" % devices)
463 updated = []
464 lastChanges = dict(devices)
465 keep = []
466 for cmd in self.schedule:
467 if cmd.deviceConfig.device in lastChanges:
468 if cmd.lastChange > lastChanges[cmd.device]:
469 updated.append(cmd.deviceConfig.device)
470 keep.append(cmd)
471 else:
472 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
473 self.schedule = keep
474 if updated:
475 self.log.info("Fetching the config for %s", updated)
476 d = self.model().callRemote('getDataSourceCommands', devices)
477 d.addCallback(self.updateConfig, updated)
478 d.addErrback(self.error)
479
497
516
517
519 """
520 Run through the schedule and start anything that needs to be done.
521 Set a timer if we have nothing to do.
522 """
523 if not self.options.cycle:
524 for cmd in self.schedule:
525 if cmd.running() or cmd.lastStart == 0:
526 break
527 else:
528 self.stop()
529 return
530 try:
531 if self.timeout and not self.timeout.called:
532 self.timeout.cancel()
533 self.timeout = None
534 def compare(x, y):
535 return cmp(x.nextRun(), y.nextRun())
536 self.schedule.sort(compare)
537 self.pool.trimConnections(self.schedule)
538 earliest = None
539 running = 0
540 now = time.time()
541 for c in self.schedule:
542 if c.running():
543 running += 1
544
545 for c in self.schedule:
546 if running >= self.options.parallel:
547 break
548 if c.nextRun() <= now:
549 c.start(self.pool).addBoth(self.finished)
550 running += 1
551 else:
552 earliest = c.nextRun() - now
553 break
554
555 if earliest is not None:
556 self.log.debug("Next command in %d seconds", int(earliest))
557 self.timeout = reactor.callLater(max(1, earliest),
558 self.processSchedule)
559 except Exception, ex:
560 self.log.exception(ex)
561
562
570
571
586
588 """
589 Process the results of our command-line, send events
590 and check datapoints.
591
592 @param cmd: command
593 @type: cmd object
594 """
595 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
596 results = ParsedResults()
597 try:
598 parser = getParser(cmd.parser)
599 except Exception, ex:
600 self.log.exception("Error loading parser %s" % cmd.parser)
601 import traceback
602 self.sendEvent(dict(device=cmd.deviceConfig.device,
603 summary="Error loading parser %s" % cmd.parser,
604 component="zencommand",
605 message=traceback.format_exc(),
606 agent="zencommand",
607 ))
608 return
609 parser.processResults(cmd, results)
610
611 for ev in results.events:
612 self.sendEvent(ev, device=cmd.deviceConfig.device)
613
614 for dp, value in results.values:
615 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
616 value = self.rrd.save(dp.rrdPath,
617 value,
618 dp.rrdType,
619 dp.rrdCreateCommand,
620 cmd.cycleTime,
621 dp.rrdMin,
622 dp.rrdMax)
623 self.log.debug("RRD save result: %s" % value)
624 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
625 eventKey = cmd.getEventKey(dp)
626 if 'eventKey' in ev:
627 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
628 else:
629 ev['eventKey'] = eventKey
630 ev['component'] = dp.component
631 self.sendEvent(ev)
632
674
675 return drive(doFetchConfig)
676
677
678 - def start(self, driver):
679 """
680 Fetch the configuration and return a deferred for its completion.
681 Also starts the config cycle
682 """
683 ex = None
684 try:
685 self.log.debug('Fetching configuration from zenhub')
686 yield self.fetchConfig()
687 driver.next()
688 self.log.debug('Finished config fetch')
689 except Exception, ex:
690 self.log.exception(ex)
691 driveLater(self.configCycleInterval * 60, self.start)
692 if ex:
693 raise ex
694
696 RRDDaemon.buildOptions(self)
697
698 self.parser.add_option('--parallel', dest='parallel',
699 default=10, type='int',
700 help="Number of devices to collect at one time")
701
706
707
708 if __name__ == '__main__':
709 from Products.ZenRRD.zencommand import zencommand
710 z = zencommand()
711 z.run()
712