| Trees | Indices | Help |
|
|---|
|
|
1 ###########################################################################
2 #
3 # This program is part of Zenoss Core, an open source monitoring platform.
4 # Copyright (C) 2007, Zenoss Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License version 2 as published by
8 # the Free Software Foundation.
9 #
10 # For complete information please visit: http://www.zenoss.com/oss/
11 #
12 ###########################################################################
13
14 __doc__=''' ZenCommand
15
16 Run Command pluggins periodically.
17
18 $Id$'''
19
20 __version__ = "$Revision$"[11:-2]
21
22 import time
23 import logging
24 log = logging.getLogger("zen.zencommand")
25
26 from twisted.internet import reactor, defer
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29
30 import Globals
31 from Products.ZenUtils.Driver import drive, driveLater
32 from Products.ZenEvents import Event
33
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.Thresholds import Thresholds
36 from Products.ZenRRD.RRDUtil import RRDUtil
37 from Products.DataCollector.SshClient import SshClient
38
39 from sets import Set
40
41 import re
42 # how to parse each value from a nagios command
43 NagParser = re.compile(r"""([^ =']+|'(.*)'+)=([-0-9.]+)([^;]*;?){0,5}""")
44 # how to parse each value from a cacti command
45 CacParser = re.compile(r"""([^ :']+|'(.*)'+):([-0-9.]+)""")
46
47 MAX_CONNECTIONS=256
48
49 EXIT_CODE_MAPPING = {
50 0:'Success',
51 1:'General error',
52 2:'Misuse of shell builtins',
53 126:'Command invoked cannot execute, permissions problem or command is not an executable',
54 127:'Command not found',
55 128:'Invalid argument to exit, exit takes only integers in the range 0-255',
56 130:'Fatal error signal: 2, Command terminated by Control-C'
57 }
58
64
65
72
73
75 "Cause an error on a deferred when it is taking too long to complete"
76
77 def _timeout(deferred, obj):
78 "took too long... call an errback"
79 deferred.errback(failure.Failure(TimeoutError(obj)))
80
81
82 def _cb(arg, timer):
83 "the command finished, possibly by timing out"
84 if not timer.called:
85 timer.cancel()
86 return arg
87
88
89 timer = reactor.callLater(seconds, _timeout, deferred, obj)
90 deferred.addBoth(_cb, timer)
91 return deferred
92
93
95 "Provide deferred process execution"
96 stopped = None
97 exitCode = None
98 output = ''
99
101 "Kick off the process: run it local"
102 log.debug('running %r' % cmd.command)
103
104 import string
105 shell = '/bin/sh'
106 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
107 self.command = string.join(self.cmdline, ' ')
108 log.debug('cmd line: %r' % self.command)
109 reactor.spawnProcess(self, shell, self.cmdline, env=None)
110
111 d = Timeout(defer.Deferred(), cmd.commandTimeout, cmd)
112 self.stopped = d
113 self.stopped.addErrback(self.timeout)
114 return d
115
117 "Kill a process if it takes too long"
118 try:
119 self.transport.signalProcess('KILL')
120 except ProcessExitedAlready:
121 log.debug("Command already exited on device %s: %s" % (cmd.device,
122 cmd.command))
123 return value
124
125
129
130
132 "notify the starter that their process is complete"
133 self.exitCode = reason.value.exitCode
134 log.debug('Received exit code: %s' % self.exitCode)
135 log.debug('Command: %r' % self.command)
136 log.debug('Output: %r' % self.output)
137
138 self.output = [s.strip() for s in self.output.split('\n')][0]
139 if self.stopped:
140 d, self.stopped = self.stopped, None
141 if not d.called:
142 d.callback(self)
143
144
146 "Connection to SSH server at the remote device"
147
151
152
154 "Run a command against the server"
155 d = defer.Deferred()
156 self.defers[command] = d
157 SshClient.addCommand(self, command)
158 return d
159
160
162 "Forward the results of the command execution to the starter"
163 SshClient.addResult(self, command, data, code)
164 d = self.defers.pop(command)
165 if not d.called:
166 d.callback((data, code))
167
168
172
173
175 "We don't need to track commands/results when they complete"
176 SshClient.clientFinished(self)
177 self.commands = []
178 self.results = []
179
180
182 "Cache all the Ssh connections so they can be managed"
183
186
187
189 "Make an ssh connection if there isn't one available"
190 result = self.pool.get(cmd.device, None)
191 if result is None:
192 log.debug("Creating connection to %s", cmd.device)
193 options = Options(cmd.username, cmd.password,
194 cmd.loginTimeout, cmd.commandTimeout, cmd.keyPath)
195 # New param KeyPath
196 result = MySshClient(cmd.device, cmd.ipAddress, cmd.port,
197 options=options)
198 result.run()
199 self.pool[cmd.device] = result
200 return result
201
202
204 "close the SSH connection to a device, if it exists"
205 c = self.pool.get(device, None)
206 if c:
207 log.debug("Closing connection to %s", device)
208 if c.transport:
209 c.transport.loseConnection()
210 del self.pool[device]
211
212
216
217
219 "reduce the number of connections using the schedule for guidance"
220 # compute device list in order of next use
221 devices = []
222 for c in schedule:
223 if c.device not in devices:
224 devices.append(c.device)
225 # close as many devices as needed
226 while devices and len(self.pool) > MAX_CONNECTIONS:
227 self._close(devices.pop())
228
229
231 "Run a single command across a cached Ssh connection"
232 exitCode = None
233 output = None
234
237
238
240 "Initiate a command on the remote device"
241 self.defer = defer.Deferred()
242 c = self.pool.get(cmd)
243 try:
244 d = Timeout(c.addCommand(cmd.command), cmd.commandTimeout, cmd)
245 except Exception, ex:
246 log.warning('Error starting command: %s', ex)
247 self.pool.close(cmd)
248 return defer.fail(ex)
249 d.addErrback(self.timeout)
250 d.addBoth(self.processEnded)
251 return d
252
253
255 "Deal with slow executing command/connection (close it)"
256 cmd, = arg.value.args
257 # we could send a kill signal, but then we would need to track
258 # the command channel to send it to: just close the connection
259 self.pool.close(cmd)
260 return arg
261
262
269
270
272 "Holds the config of every command to be run"
273 device = None
274 ipAddress = None
275 port = 22
276 username = None
277 password = None
278 command = None
279 useSsh = False
280 cycleTime = None
281 eventClass = None
282 eventKey = None
283 component = None
284 severity = 3
285 lastStart = 0
286 lastStop = 0
287 result = None
288
289
292
295
296
298 cmd, args = (self.command + ' ').split(' ', 1)
299 cmd = cmd.split('/')[-1]
300 return '%s %s' % (cmd, args)
301
302
304 if self.running():
305 return self.lastStart + self.cycleTime
306 return self.lastStop + self.cycleTime
307
308
310 if self.useSsh:
311 pr = SshRunner(pool)
312 else:
313 pr = ProcessRunner()
314 d = pr.start(self)
315 self.lastStart = time.time()
316 log.debug('Process %s started' % self.name())
317 d.addBoth(self.processEnded)
318 return d
319
320
322 self.result = pr
323 self.lastStop = time.time()
324 if not isinstance(pr, failure.Failure):
325 log.debug('Process %s stopped (%s), %f elapsed' % (
326 self.name(),
327 pr.exitCode,
328 self.lastStop - self.lastStart))
329 return self
330 return pr
331
332
334 self.lastChange = cfg.lastChange
335 self.device = cfg.device
336 self.ipAddress = cfg.ipAddress
337 self.port = cfg.port
338 self.username = str(cfg.username)
339 self.password = str(cfg.password)
340 self.loginTimeout = cfg.loginTimeout
341 self.commandTimeout = cfg.commandTimeout
342 self.keyPath = cfg.keyPath
343 self.useSsh = cfg.useSsh
344 self.cycleTime = max(cfg.cycleTime, 1)
345 self.eventKey = cfg.eventKey
346 self.eventClass = cfg.eventClass
347 self.component = cfg.component
348 self.severity = cfg.severity
349 self.command = str(cfg.command)
350 self.points, before = {}, self.points
351 for p in cfg.points:
352 self.points[p[0]] = p[1:]
353
355 loginTries=1
356 searchPath=''
357 existenceTest=None
358
360 self.username = username
361 self.password = password
362 self.loginTimeout=loginTimeout
363 self.commandTimeout=commandTimeout
364 self.keyPath = keyPath
365
366
368
369 initialServices = RRDDaemon.initialServices + ['CommandConfig']
370
372 RRDDaemon.__init__(self, 'zencommand')
373 self.schedule = []
374 self.timeout = None
375 self.deviceIssues = Set()
376 self.pool = SshPool()
377
379 self.log.debug("Async delete device %s" % doomed)
380 self.schedule = [c for c in self.schedule if c.device != doomed]
381
385
387 self.log.debug("Async update device list %s" % devices)
388 updated = []
389 lastChanges = dict(devices) # map device name to last change
390 keep = []
391 for cmd in self.schedule:
392 if cmd.device in lastChanges:
393 if cmd.lastChange > lastChanges[cmd.device]:
394 updated.append(cmd.device)
395 keep.append(cmd)
396 else:
397 log.info("Removing all commands for %s", cmd.device)
398 self.schedule = keep
399 if updated:
400 log.info("Fetching the config for %s", updated)
401 d = self.model().callRemote('getDataSourceCommands', devices)
402 d.addCallback(self.updateConfig, updated)
403 d.addErrback(self.error)
404
406 expected = Set(expected)
407 current = {}
408 for c in self.schedule:
409 if c.device in expected:
410 current[c.device,c.command] = c
411 # keep all the commands we didn't ask for
412 update = [c for c in self.schedule if c.device not in expected]
413 for c in config:
414 (lastChange, device, ipAddress, port,
415 username, password,
416 loginTimeout, commandTimeout,
417 keyPath, maxOids, commandPart, threshs) = c
418 self.thresholds.updateList(threshs)
419 if self.options.device and self.options.device != device:
420 continue
421 for cmd in commandPart:
422 (useSsh, cycleTime,
423 component, eventClass, eventKey, severity, command, points) = cmd
424 obj = current.setdefault((device,command), Cmd())
425 del current[(device,command)]
426 obj.updateConfig(CommandConfig(locals()))
427 update.append(obj)
428 for device, command in current.keys():
429 log.info("Deleting command %s from %s", device, command)
430 self.schedule = update
431 self.processSchedule()
432
434 "There is no master 'cycle' to send the hearbeat"
435 self.heartbeat()
436 reactor.callLater(self.heartBeatTimeout/3, self.heartbeatCycle)
437
439 """Run through the schedule and start anything that needs to be done.
440 Set a timer if we have nothing to do.
441 """
442 log.info("%s - schedule has %d commands", '-'*10, len(self.schedule))
443 if not self.options.cycle:
444 for cmd in self.schedule:
445 if cmd.running() or cmd.lastStart == 0:
446 break
447 else:
448 self.stop()
449 return
450 try:
451 if self.timeout and not self.timeout.called:
452 self.timeout.cancel()
453 self.timeout = None
454 def compare(x, y):
455 return cmp(x.nextRun(), y.nextRun())
456 self.schedule.sort(compare)
457 self.pool.trimConnections(self.schedule)
458 earliest = None
459 running = 0
460 now = time.time()
461 for c in self.schedule:
462 if c.running():
463 running += 1
464 for c in self.schedule:
465 if running >= self.options.parallel:
466 break
467 if c.nextRun() <= now:
468 c.start(self.pool).addBoth(self.finished)
469 running += 1
470 else:
471 earliest = c.nextRun() - now
472 break
473 if earliest is not None:
474 log.debug("Next command in %f seconds", earliest)
475 self.timeout = reactor.callLater(max(1, earliest),
476 self.processSchedule)
477 except Exception, ex:
478 log.exception(ex)
479
480
482 if isinstance(cmd, failure.Failure):
483 self.error(cmd)
484 else:
485 self.parseResults(cmd)
486 self.processSchedule()
487
488
490 if isinstance(err.value, TimeoutError):
491 cmd, = err.value.args
492 msg = "Command timed out on device %s: %s" % (cmd.device, cmd.command)
493 log.warning(msg)
494 issueKey = cmd.device, cmd.eventClass, cmd.eventKey
495 self.deviceIssues.add(issueKey)
496 self.sendEvent(dict(device=cmd.device,
497 eventClass=cmd.eventClass,
498 eventKey=cmd.eventKey,
499 component=cmd.component,
500 severity=cmd.severity,
501 summary=msg))
502 else:
503 log.exception(err.value)
504
506 if exitCode in EXIT_CODE_MAPPING.keys():
507 return EXIT_CODE_MAPPING[exitCode]
508 elif exitCode >= 255:
509 return 'Exit status out of range, exit takes only integer arguments in the range 0-255'
510 elif exitCode > 128:
511 return 'Fatal error signal: %s' % (exitCode-128)
512 return 'Unknown error code: %s' % exitCode
513
515 log.debug('The result of "%s" was "%s"', cmd.command, cmd.result.output)
516 output = cmd.result.output
517 exitCode = cmd.result.exitCode
518 severity = cmd.severity
519 issueKey = cmd.device, cmd.eventClass, cmd.eventKey
520 if output.find('|') >= 0:
521 msg, values = output.split('|', 1)
522 elif CacParser.search(output):
523 msg, values = '', output
524 else:
525 msg, values = output, ''
526 msg = msg.strip() or 'Cmd: %s - Code: %s - Msg: %s' % (cmd.command, exitCode, self.getExitMessage(exitCode))
527 if exitCode == 0:
528 severity = 0
529 elif exitCode == 2:
530 severity = min(severity + 1, 5)
531 if severity or issueKey in self.deviceIssues:
532 self.sendThresholdEvent(device=cmd.device,
533 summary=msg,
534 severity=severity,
535 message=msg,
536 performanceData=values,
537 eventKey=cmd.eventKey,
538 eventClass=cmd.eventClass,
539 component=cmd.component)
540 self.deviceIssues.add(issueKey)
541 if severity == 0:
542 self.deviceIssues.discard(issueKey)
543
544 for value in values.split(' '):
545 if value.find('=') > 0:
546 parts = NagParser.match(value)
547 else:
548 parts = CacParser.match(value)
549 if not parts: continue
550 label = parts.group(1).replace("''", "'")
551 try:
552 value = float(parts.group(3))
553 except:
554 value = 'U'
555 if cmd.points.has_key(label):
556 path, type, command, (minv, maxv) = cmd.points[label]
557 log.debug("storing %s = %s in: %s" % (label, value, path))
558 value = self.rrd.save(path, value, type, command,
559 cmd.cycleTime, minv, maxv)
560 log.debug("rrd save result: %s" % value)
561 for ev in self.thresholds.check(path, time.time(), value):
562 ev['eventKey'] = cmd.eventKey
563 ev['eventClass'] = cmd.eventClass
564 ev['component'] = cmd.component
565 self.sendThresholdEvent(**ev)
566
568 def doFetchConfig(driver):
569 try:
570 yield self.model().callRemote('propertyItems')
571 self.setPropertyItems(driver.next())
572
573 yield self.model().callRemote('getDefaultRRDCreateCommand')
574 createCommand = driver.next()
575
576 yield self.model().callRemote('getThresholdClasses')
577 self.remote_updateThresholdClasses(driver.next())
578
579 devices = []
580 if self.options.device:
581 devices = [self.options.device]
582 yield self.model().callRemote('getDataSourceCommands',
583 devices)
584 if not devices:
585 devices = list(Set([c.device for c in self.schedule]))
586 self.updateConfig(driver.next(), devices)
587
588 self.rrd = RRDUtil(createCommand, 60)
589
590 except Exception, ex:
591 log.exception(ex)
592 raise
593
594 return drive(doFetchConfig)
595
596
598 """Fetch the configuration and return a deferred for its completion.
599 Also starts the config cycle"""
600 ex = None
601 try:
602 log.debug('Fetching config')
603 yield self.fetchConfig()
604 driver.next()
605 log.debug('Finished config fetch')
606 except Exception, ex:
607 log.exception(ex)
608 driveLater(self.configCycleInterval * 60, self.start)
609 if ex:
610 raise ex
611
612
614 RRDDaemon.buildOptions(self)
615
616 self.parser.add_option('--parallel', dest='parallel',
617 default=10, type='int',
618 help="number of devices to collect at one time")
619
621 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop)
622 if self.options.cycle:
623 d.addCallback(self.heartbeatCycle)
624
625
626 if __name__ == '__main__':
627 z = zencommand()
628 z.run()
629
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0beta1 on Thu Oct 25 16:28:32 2007 | http://epydoc.sourceforge.net |