1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__=''' ZenCommand
15
16 Run Command pluggins periodically.
17
18 $Id$'''
19
20 __version__ = "$Revision$"[11:-2]
21
22 import time
23 import logging
24 log = logging.getLogger("zen.zencommand")
25
26 from twisted.internet import reactor, defer
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29
30 import Globals
31 from Products.ZenUtils.Driver import drive, driveLater
32 from Products.ZenEvents import Event
33
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.Thresholds import Thresholds
36 from Products.ZenRRD.RRDUtil import RRDUtil
37 from Products.DataCollector.SshClient import SshClient
38
39 from sets import Set
40
41 import re
42
43 NagParser = re.compile(r"""([^ =']+|'(.*)'+)=([-0-9.]+)([^;]*;?){0,5}""")
44
45 CacParser = re.compile(r"""([^ :']+|'(.*)'+):([-0-9.]+)""")
46
47 MAX_CONNECTIONS=256
48
49 EXIT_CODE_MAPPING = {
50 0:'Success',
51 1:'General error',
52 2:'Misuse of shell builtins',
53 126:'Command invoked cannot execute, permissions problem or command is not an executable',
54 127:'Command not found',
55 128:'Invalid argument to exit, exit takes only integers in the range 0-255',
56 130:'Fatal error signal: 2, Command terminated by Control-C'
57 }
58
61 d = dictionary.copy()
62 d['self'] = self
63 self.__dict__.update(dictionary)
64
65
67 "Error for a defered call taking too long to complete"
68
72
73
74 -def Timeout(deferred, seconds, obj):
75 "Cause an error on a deferred when it is taking too long to complete"
76
77 def _timeout(deferred, obj):
78 "took too long... call an errback"
79 deferred.errback(failure.Failure(TimeoutError(obj)))
80
81
82 def _cb(arg, timer):
83 "the command finished, possibly by timing out"
84 if not timer.called:
85 timer.cancel()
86 return arg
87
88
89 timer = reactor.callLater(seconds, _timeout, deferred, obj)
90 deferred.addBoth(_cb, timer)
91 return deferred
92
93
95 "Provide deferred process execution"
96 stopped = None
97 exitCode = None
98 output = ''
99
101 "Kick off the process: run it local"
102 log.debug('running %r' % cmd.command)
103
104 import string
105 shell = '/bin/sh'
106 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
107 self.command = string.join(self.cmdline, ' ')
108 log.debug('cmd line: %r' % self.command)
109 reactor.spawnProcess(self, shell, self.cmdline, env=None)
110
111 d = Timeout(defer.Deferred(), cmd.commandTimeout, cmd)
112 self.stopped = d
113 self.stopped.addErrback(self.timeout)
114 return d
115
117 "Kill a process if it takes too long"
118 try:
119 self.transport.signalProcess('KILL')
120 except ProcessExitedAlready:
121 log.debug("Command already exited on device %s: %s" % (cmd.device,
122 cmd.command))
123 return value
124
125
127 "Store up the output as it arrives from the process"
128 self.output += data
129
130
143
144
146 "Connection to SSH server at the remote device"
147
151
152
159
160
167
168
169 - def check(self, ip, timeout=2):
170 "Turn off blocking SshClient.test method"
171 return True
172
173
175 "We don't need to track commands/results when they complete"
176 SshClient.clientFinished(self)
177 self.commands = []
178 self.results = []
179
180
182 "Cache all the Ssh connections so they can be managed"
183
186
187
188 - def get(self, cmd):
201
202
204 "close the SSH connection to a device, if it exists"
205 c = self.pool.get(device, None)
206 if c:
207 log.debug("Closing connection to %s", device)
208 if c.transport:
209 c.transport.loseConnection()
210 del self.pool[device]
211
212
214 "symetric close that matches get() method"
215 self._close(cmd.device)
216
217
228
229
231 "Run a single command across a cached Ssh connection"
232 exitCode = None
233 output = None
234
237
238
252
253
255 "Deal with slow executing command/connection (close it)"
256 cmd, = arg.value.args
257
258
259 self.pool.close(cmd)
260 return arg
261
262
264 "Deliver ourselves to the starter with the proper attributes"
265 if isinstance(value, failure.Failure):
266 return value
267 self.output, self.exitCode = value
268 return self
269
270
353
365
366
368
369 initialServices = RRDDaemon.initialServices + ['CommandConfig']
370
377
379 self.log.debug("Async delete device %s" % doomed)
380 self.schedule = [c for c in self.schedule if c.device != doomed]
381
383 self.log.debug("Async configuration update")
384 self.updateConfig([config], [config[1]])
385
387 self.log.debug("Async update device list %s" % devices)
388 updated = []
389 lastChanges = dict(devices)
390 keep = []
391 for cmd in self.schedule:
392 if cmd.device in lastChanges:
393 if cmd.lastChange > lastChanges[cmd.device]:
394 updated.append(cmd.device)
395 keep.append(cmd)
396 else:
397 log.info("Removing all commands for %s", cmd.device)
398 self.schedule = keep
399 if updated:
400 log.info("Fetching the config for %s", updated)
401 d = self.model().callRemote('getDataSourceCommands', devices)
402 d.addCallback(self.updateConfig, updated)
403 d.addErrback(self.error)
404
406 expected = Set(expected)
407 current = {}
408 for c in self.schedule:
409 if c.device in expected:
410 current[c.device,c.command] = c
411
412 update = [c for c in self.schedule if c.device not in expected]
413 for c in config:
414 (lastChange, device, ipAddress, port,
415 username, password,
416 loginTimeout, commandTimeout,
417 keyPath, maxOids, commandPart, threshs) = c
418 self.thresholds.updateList(threshs)
419 if self.options.device and self.options.device != device:
420 continue
421 for cmd in commandPart:
422 (useSsh, cycleTime,
423 component, eventClass, eventKey, severity, command, points) = cmd
424 obj = current.setdefault((device,command), Cmd())
425 del current[(device,command)]
426 obj.updateConfig(CommandConfig(locals()))
427 update.append(obj)
428 for device, command in current.keys():
429 log.info("Deleting command %s from %s", device, command)
430 self.schedule = update
431 self.processSchedule()
432
437
439 """Run through the schedule and start anything that needs to be done.
440 Set a timer if we have nothing to do.
441 """
442 log.info("%s - schedule has %d commands", '-'*10, len(self.schedule))
443 if not self.options.cycle:
444 for cmd in self.schedule:
445 if cmd.running() or cmd.lastStart == 0:
446 break
447 else:
448 self.stop()
449 return
450 try:
451 if self.timeout and not self.timeout.called:
452 self.timeout.cancel()
453 self.timeout = None
454 def compare(x, y):
455 return cmp(x.nextRun(), y.nextRun())
456 self.schedule.sort(compare)
457 self.pool.trimConnections(self.schedule)
458 earliest = None
459 running = 0
460 now = time.time()
461 for c in self.schedule:
462 if c.running():
463 running += 1
464 for c in self.schedule:
465 if running >= self.options.parallel:
466 break
467 if c.nextRun() <= now:
468 c.start(self.pool).addBoth(self.finished)
469 running += 1
470 else:
471 earliest = c.nextRun() - now
472 break
473 if earliest is not None:
474 log.debug("Next command in %f seconds", earliest)
475 self.timeout = reactor.callLater(max(1, earliest),
476 self.processSchedule)
477 except Exception, ex:
478 log.exception(ex)
479
480
487
488
490 if isinstance(err.value, TimeoutError):
491 cmd, = err.value.args
492 msg = "Command timed out on device %s: %s" % (cmd.device, cmd.command)
493 log.warning(msg)
494 issueKey = cmd.device, cmd.eventClass, cmd.eventKey
495 self.deviceIssues.add(issueKey)
496 self.sendEvent(dict(device=cmd.device,
497 eventClass=cmd.eventClass,
498 eventKey=cmd.eventKey,
499 component=cmd.component,
500 severity=cmd.severity,
501 summary=msg))
502 else:
503 log.exception(err.value)
504
513
515 log.debug('The result of "%s" was "%s"', cmd.command, cmd.result.output)
516 output = cmd.result.output
517 exitCode = cmd.result.exitCode
518 severity = cmd.severity
519 issueKey = cmd.device, cmd.eventClass, cmd.eventKey
520 if output.find('|') >= 0:
521 msg, values = output.split('|', 1)
522 elif CacParser.search(output):
523 msg, values = '', output
524 else:
525 msg, values = output, ''
526 msg = msg.strip() or 'Cmd: %s - Code: %s - Msg: %s' % (cmd.command, exitCode, self.getExitMessage(exitCode))
527 if exitCode == 0:
528 severity = 0
529 elif exitCode == 2:
530 severity = min(severity + 1, 5)
531 if severity or issueKey in self.deviceIssues:
532 self.sendThresholdEvent(device=cmd.device,
533 summary=msg,
534 severity=severity,
535 message=msg,
536 performanceData=values,
537 eventKey=cmd.eventKey,
538 eventClass=cmd.eventClass,
539 component=cmd.component)
540 self.deviceIssues.add(issueKey)
541 if severity == 0:
542 self.deviceIssues.discard(issueKey)
543
544 for value in values.split(' '):
545 if value.find('=') > 0:
546 parts = NagParser.match(value)
547 else:
548 parts = CacParser.match(value)
549 if not parts: continue
550 label = parts.group(1).replace("''", "'")
551 try:
552 value = float(parts.group(3))
553 except:
554 value = 'U'
555 if cmd.points.has_key(label):
556 path, type, command, (minv, maxv) = cmd.points[label]
557 log.debug("storing %s = %s in: %s" % (label, value, path))
558 value = self.rrd.save(path, value, type, command,
559 cmd.cycleTime, minv, maxv)
560 log.debug("rrd save result: %s" % value)
561 for ev in self.thresholds.check(path, time.time(), value):
562 ev['eventKey'] = cmd.eventKey
563 ev['eventClass'] = cmd.eventClass
564 ev['component'] = cmd.component
565 self.sendThresholdEvent(**ev)
566
593
594 return drive(doFetchConfig)
595
596
597 - def start(self, driver):
611
612
614 RRDDaemon.buildOptions(self)
615
616 self.parser.add_option('--parallel', dest='parallel',
617 default=10, type='int',
618 help="number of devices to collect at one time")
619
624
625
626 if __name__ == '__main__':
627 z = zencommand()
628 z.run()
629