| Trees | Indices | Help |
|
|---|
|
|
1 ###########################################################################
2 #
3 # This program is part of Zenoss Core, an open source monitoring platform.
4 # Copyright (C) 2007, 2009, Zenoss Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License version 2 as published by
8 # the Free Software Foundation.
9 #
10 # For complete information please visit: http://www.zenoss.com/oss/
11 #
12 ###########################################################################
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.Utils import unused, getExitMessage
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37 from Products.ZenEvents.ZenEventClasses import Clear, Error
38 from Products.ZenRRD.CommandParser import ParsedResults
39
40 from Products.DataCollector import Plugins
41 unused(Plugins)
42 MAX_CONNECTIONS = 256
43
44
45
54
55
57 "Cause an error on a deferred when it is taking too long to complete"
58
59 def _timeout(deferred, obj):
60 "took too long... call an errback"
61 deferred.errback(failure.Failure(TimeoutError(obj)))
62
63
64 def _cb(arg, timer):
65 "the command finished, possibly by timing out"
66 if not timer.called:
67 timer.cancel()
68 return arg
69
70
71 timer = reactor.callLater(seconds, _timeout, deferred, obj)
72 deferred.addBoth(_cb, timer)
73 return deferred
74
75
77 """
78 Provide deferred process execution
79 """
80 stopped = None
81 exitCode = None
82 output = ''
83
85 "Kick off the process: run it local"
86 log.debug('running %r' % cmd.command)
87
88 shell = '/bin/sh'
89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
90 self.command = ' '.join(self.cmdline)
91 log.debug('cmd line: %r' % self.command)
92 reactor.spawnProcess(self, shell, self.cmdline, env=None)
93
94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
95 self.stopped = d
96 self.stopped.addErrback(self.timeout)
97 return d
98
100 "Kill a process if it takes too long"
101 try:
102 self.transport.signalProcess('KILL')
103 except error.ProcessExitedAlready:
104 log.debug("Command already exited: %s" % self.command)
105 return value
106
107
111
112
114 "notify the starter that their process is complete"
115 self.exitCode = reason.value.exitCode
116 log.debug('Received exit code: %s' % self.exitCode)
117 log.debug('Command: %r' % self.command)
118 log.debug('Output: %r' % self.output)
119
120 if self.stopped:
121 d, self.stopped = self.stopped, None
122 if not d.called:
123 d.callback(self)
124
125
127 """
128 Connection to SSH server at the remote device
129 """
130
134
136 "Run a command against the server"
137 d = defer.Deferred()
138 self.defers[command] = d
139 SshClient.addCommand(self, command)
140 return d
141
142
144 "Forward the results of the command execution to the starter"
145 SshClient.addResult(self, command, data, code)
146 d = self.defers.pop(command)
147 if not d.called:
148 d.callback((data, code))
149
150
154
155
157 "We don't need to track commands/results when they complete"
158 SshClient.clientFinished(self)
159 self.commands = []
160 self.results = []
161
162
164 """
165 Cache all the Ssh connections so they can be managed
166 """
167
170
171
173 "Make a new SSH connection if there isn't one available"
174 dc = cmd.deviceConfig
175 result = self.pool.get(dc.device, None)
176 if result is None:
177 log.debug("Creating connection to %s", dc.device)
178 options = Options(dc.username, dc.password,
179 dc.loginTimeout, dc.commandTimeout,
180 dc.keyPath, dc.concurrentSessions)
181 # New param KeyPath
182 result = MySshClient(dc.device, dc.ipAddress, dc.port,
183 options=options)
184 result.run()
185 self.pool[dc.device] = result
186 return result
187
188
190 "close the SSH connection to a device, if it exists"
191 c = self.pool.get(device, None)
192 if c:
193 log.debug("Closing connection to %s", device)
194 if c.transport:
195 c.transport.loseConnection()
196 del self.pool[device]
197
198
202
203
205 "reduce the number of connections using the schedule for guidance"
206 # compute device list in order of next use
207 devices = []
208 for c in schedule:
209 device = c.deviceConfig.device
210 if device not in devices:
211 devices.append(device)
212 # close as many devices as needed
213 while devices and len(self.pool) > MAX_CONNECTIONS:
214 self._close(devices.pop())
215
216
218 """
219 Run a single command across a cached SSH connection
220 """
221 exitCode = None
222 output = None
223
226
227
229 "Initiate a command on the remote device"
230 self.defer = defer.Deferred()
231 c = self.pool.get(cmd)
232 try:
233 d = Timeout(c.addCommand(cmd.command),
234 cmd.deviceConfig.commandTimeout,
235 cmd)
236 except Exception, ex:
237 log.warning('Error starting command: %s', ex)
238 self.pool.close(cmd)
239 return defer.fail(ex)
240 d.addErrback(self.timeout)
241 d.addBoth(self.processEnded)
242 return d
243
244
246 "Deal with slow executing command/connection (close it)"
247 cmd, = arg.value.args
248 # we could send a kill signal, but then we would need to track
249 # the command channel to send it to: just close the connection
250 self.pool.close(cmd)
251 return arg
252
253
260
261
263 lastChange = 0.
264 device = ''
265 ipAddress = ''
266 port = 0
267 username = ''
268 password = ''
269 loginTimeout = 0.
270 commandTimeout = 0.
271 keyPath = ''
272 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
273
274
276 id = ''
277 component = ''
278 rrdPath = ''
279 rrdType = None
280 rrdCreateCommand = ''
281 rrdMin = None
282 rrdMax = None
283
285 self.data = {}
286
289
290 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
291
293 """
294 Holds the config of every command to be run
295 """
296 command = None
297 useSsh = False
298 cycleTime = None
299 eventClass = None
300 eventKey = None
301 severity = 3
302 lastStart = 0
303 lastStop = 0
304 result = None
305
306
309
312
313
315 cmd, args = (self.command + ' ').split(' ', 1)
316 cmd = cmd.split('/')[-1]
317 return '%s %s' % (cmd, args)
318
319
321 if self.running():
322 return self.lastStart + self.cycleTime
323 return self.lastStop + self.cycleTime
324
325
327 if self.useSsh:
328 pr = SshRunner(pool)
329 else:
330 pr = ProcessRunner()
331 d = pr.start(self)
332 self.lastStart = time.time()
333 log.debug('Process %s started' % self.name())
334 d.addBoth(self.processEnded)
335 return d
336
337
339 self.result = pr
340 self.lastStop = time.time()
341
342 # Check for a condition that could cause zencommand to stop cycling.
343 # http://dev.zenoss.org/trac/ticket/4936
344 if self.lastStop < self.lastStart:
345 log.debug('System clock went back?')
346 self.lastStop = self.lastStart
347
348 if not isinstance(pr, failure.Failure):
349 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % (
350 self.name(),
351 pr.exitCode,
352 self.lastStop - self.lastStart))
353 return self
354 return pr
355
356
358 self.deviceConfig = deviceConfig
359 self.useSsh = cfg.useSsh
360 self.cycleTime = max(cfg.cycleTime, 1)
361 self.eventKey = cfg.eventKey
362 self.eventClass = cfg.eventClass
363 self.severity = cfg.severity
364 self.command = str(cfg.command)
365 self.points = cfg.points
366 return self
367
369 # fetch datapoint name from filename path and add it to the event key
370 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
371
376
377 pb.setUnjellyableForClass(Cmd, Cmd)
378
380 loginTries=1
381 searchPath=''
382 existenceTest=None
383
384 - def __init__(self, username, password, loginTimeout, commandTimeout,
385 keyPath, concurrentSessions):
386 self.username = username
387 self.password = password
388 self.loginTimeout=loginTimeout
389 self.commandTimeout=commandTimeout
390 self.keyPath = keyPath
391 self.concurrentSessions = concurrentSessions
392
393
395
397 """
398 config - one of the configurations that was returned by calling
399 getDataSourceCommands on zenhub.
400 scheduledCmds - a dictionary that maps (device, command) to Cmd object
401 for the commands currently on zencommand's schedule.
402 sendEvent - a function that sends events to zenhub using twisted
403 perspective broker.
404 """
405 self._config = config
406 self._scheduledCmds = scheduledCmds
407 self._sendEvent = sendEvent
408 self._device = config.device
409 self._suppressed = [] # log warning and send event once per device
410 self._summary = 'zCommandUsername is not set'
411
413 "send an event (or clear it) for username not set"
414 self._sendEvent(dict(
415 device=self._device,
416 eventClass='/Cmd/Fail',
417 eventKey='zCommandUsername',
418 severity=severity,
419 component='zencommand',
420 summary=self._summary))
421
423 """
424 Warn that the username is not set for device and the SSH command cannot be
425 executed.
426 """
427 if self._device not in self._suppressed:
428 log.warning(self._summary + ' for %s' % self._device)
429 self._sendUsernameEvent(Error)
430 msg = 'username not configured for %s. skipping %s.'
431 log.debug(msg % (self._device, command))
432
434 """
435 Go through the Cmd objects in config.commands and update the config on
436 the command with config. If currentDict has the command on it then use
437 that one, otherwise use the command from config. If the device does not have a
438 username set, then don't yield commands that use SSH.
439 """
440 for cmd in self._config.commands:
441 key = (self._device, cmd.command)
442 if cmd.useSsh:
443 if self._config.username:
444 self._sendUsernameEvent(Clear)
445 else:
446 self._warnUsernameNotSet(cmd.command)
447 if self._device not in self._suppressed:
448 self._suppressed.append(self._device)
449 if key in self._scheduledCmds:
450 del self._scheduledCmds[key]
451 continue
452 if key in self._scheduledCmds:
453 newCmd = self._scheduledCmds.pop(key)
454 else:
455 newCmd = cmd
456 yield newCmd.updateConfig(cmd, self._config)
457 self._suppressed = []
458
459
461 """
462 Daemon code to schedule commands and run them.
463 """
464
465 initialServices = RRDDaemon.initialServices + ['CommandConfig']
466
468 RRDDaemon.__init__(self, 'zencommand')
469 self.schedule = []
470 self.timeout = None
471 self.pool = SshPool()
472 self.executed = 0
473
475 self.log.debug("zenhub has asked us to delete device %s" % doomed)
476 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
477
479 self.log.debug("Configuration update from zenhub")
480 self.updateConfig([config], [config.device])
481
483 self.log.debug("zenhub sent updated device list %s" % devices)
484 updated = []
485 lastChanges = dict(devices) # map device name to last change
486 keep = []
487 for cmd in self.schedule:
488 if cmd.deviceConfig.device in lastChanges:
489 if cmd.lastChange > lastChanges[cmd.device]:
490 updated.append(cmd.deviceConfig.device)
491 keep.append(cmd)
492 else:
493 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
494 self.schedule = keep
495 if updated:
496 self.log.info("Fetching the config for %s", updated)
497 d = self.model().callRemote('getDataSourceCommands', devices)
498 d.addCallback(self.updateConfig, updated)
499 d.addErrback(self.error)
500
502 expected = Set(expected)
503 current = {}
504 for c in self.schedule:
505 if c.deviceConfig.device in expected:
506 current[c.deviceConfig.device,c.command] = c
507 # keep all the commands we didn't ask for
508 update = [c for c in self.schedule if c.deviceConfig.device not in expected]
509 for cfg in configs:
510 self.thresholds.updateForDevice(cfg.device, cfg.thresholds)
511 if self.options.device and self.options.device != cfg.device:
512 continue
513 processor = ConfigurationProcessor(cfg, current, self.sendEvent)
514 update.extend(processor.updateCommands())
515 for device, command in current.keys():
516 self.log.info("Deleting command %s from %s", device, command)
517 self.schedule = update
518 self.processSchedule()
519
521 "There is no master 'cycle' to send the hearbeat"
522 self.heartbeat()
523 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle)
524 events = []
525 events += self.rrdStats.gauge('schedule',
526 self.heartbeatTimeout,
527 len(self.schedule))
528 events += self.rrdStats.counter('commands',
529 self.heartbeatTimeout,
530 self.executed)
531 events += self.rrdStats.counter('dataPoints',
532 self.heartbeatTimeout,
533 self.rrd.dataPoints)
534 events += self.rrdStats.gauge('cyclePoints',
535 self.heartbeatTimeout,
536 self.rrd.endCycle())
537 self.sendEvents(events)
538
539
541 """
542 Run through the schedule and start anything that needs to be done.
543 Set a timer if we have nothing to do.
544 """
545 if not self.options.cycle:
546 for cmd in self.schedule:
547 if cmd.running() or cmd.lastStart == 0:
548 break
549 else:
550 self.stop()
551 return
552 try:
553 if self.timeout and not self.timeout.called:
554 self.timeout.cancel()
555 self.timeout = None
556 def compare(x, y):
557 return cmp(x.nextRun(), y.nextRun())
558 self.schedule.sort(compare)
559 self.pool.trimConnections(self.schedule)
560 earliest = None
561 running = 0
562 now = time.time()
563 for c in self.schedule:
564 if c.running():
565 running += 1
566
567 for c in self.schedule:
568 if running >= self.options.parallel:
569 break
570 if c.nextRun() <= now:
571 c.start(self.pool).addBoth(self.finished)
572 running += 1
573 else:
574 earliest = c.nextRun() - now
575 break
576
577 if earliest is not None:
578 self.log.debug("Next command in %d seconds", int(earliest))
579 self.timeout = reactor.callLater(max(1, earliest),
580 self.processSchedule)
581 except Exception, ex:
582 self.log.exception(ex)
583
584
586 """
587 Send an event using the info in the Cmd object.
588 """
589 self.sendEvent(dict(device=cmd.deviceConfig.device,
590 component=cmd.component,
591 eventClass=cmd.eventClass,
592 eventKey=cmd.eventKey,
593 severity=severity,
594 summary=summary))
595
597 """
598 The command has finished. cmdOrErr is either a Cmd instance or a
599 twisted failure.
600 """
601 self.executed += 1
602 if isinstance(cmdOrErr, failure.Failure):
603 self.error(cmdOrErr)
604 else:
605 cmd = cmdOrErr
606 self._handleExitCode(cmd)
607 self.parseResults(cmd)
608 self.processSchedule()
609
611 """
612 zencommand handles sending clears for exit code 0, all other exit codes
613 should be handled by the parser associated with the command
614 """
615 exitCode = cmd.result.exitCode
616 msg = 'Cmd: %s - Code: %s - Msg: %s' % (
617 cmd.command, exitCode, getExitMessage(exitCode))
618 if exitCode == 0:
619 self.sendCmdEvent(cmd, Clear, msg)
620
622 """
623 The finished method indicated that there was a failure. This method
624 is also called by RRDDaemon.errorStop.
625 """
626 if isinstance(err.value, TimeoutError):
627 cmd, = err.value.args
628 msg = "Command timed out on device %s: %r" % (
629 cmd.deviceConfig.device, cmd.command)
630 self.log.warning(msg)
631 self.sendCmdEvent(cmd, cmd.severity, msg)
632 else:
633 self.log.exception(err.value)
634
636 """
637 Process the results of our command-line, send events
638 and check datapoints.
639
640 @param cmd: command
641 @type: cmd object
642 """
643 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
644 results = ParsedResults()
645 try:
646 parser = cmd.parser.create()
647 except Exception, ex:
648 self.log.exception("Error loading parser %s" % cmd.parser)
649 import traceback
650 self.sendEvent(dict(device=cmd.deviceConfig.device,
651 summary="Error loading parser %s" % cmd.parser,
652 component="zencommand",
653 message=traceback.format_exc(),
654 agent="zencommand",
655 ))
656 return
657 parser.processResults(cmd, results)
658
659 for ev in results.events:
660 self.sendEvent(ev, device=cmd.deviceConfig.device)
661
662 for dp, value in results.values:
663 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
664 value = self.rrd.save(dp.rrdPath,
665 value,
666 dp.rrdType,
667 dp.rrdCreateCommand,
668 cmd.cycleTime,
669 dp.rrdMin,
670 dp.rrdMax)
671 self.log.debug("RRD save result: %s" % value)
672 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
673 eventKey = cmd.getEventKey(dp)
674 if 'eventKey' in ev:
675 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
676 else:
677 ev['eventKey'] = eventKey
678 ev['component'] = dp.component
679 self.sendEvent(ev)
680
682 def doFetchConfig(driver):
683 try:
684 now = time.time()
685
686 yield self.model().callRemote('propertyItems')
687 self.setPropertyItems(driver.next())
688
689 yield self.model().callRemote('getDefaultRRDCreateCommand')
690 createCommand = driver.next()
691
692 yield self.model().callRemote('getThresholdClasses')
693 self.remote_updateThresholdClasses(driver.next())
694
695
696 yield self.model().callRemote('getCollectorThresholds')
697 self.rrdStats.config(self.options.monitor,
698 self.name,
699 driver.next(),
700 createCommand)
701
702 devices = []
703 if self.options.device:
704 devices = [self.options.device]
705 yield self.model().callRemote('getDataSourceCommands',
706 devices)
707 if not devices:
708 devices = list(Set([c.deviceConfig.device
709 for c in self.schedule]))
710 self.updateConfig(driver.next(), devices)
711
712 self.rrd = RRDUtil(createCommand, 60)
713
714 self.sendEvents(
715 self.rrdStats.gauge('configTime',
716 self.configCycleInterval * 60,
717 time.time() - now))
718
719 except Exception, ex:
720 self.log.exception(ex)
721 raise
722
723 return drive(doFetchConfig)
724
725
727 """
728 Fetch the configuration and return a deferred for its completion.
729 Also starts the config cycle
730 """
731 ex = None
732 try:
733 self.log.debug('Fetching configuration from zenhub')
734 yield self.fetchConfig()
735 driver.next()
736 self.log.debug('Finished config fetch')
737 except Exception, ex:
738 self.log.exception(ex)
739 driveLater(self.configCycleInterval * 60, self.start)
740 if ex:
741 raise ex
742
744 RRDDaemon.buildOptions(self)
745
746 self.parser.add_option('--parallel', dest='parallel',
747 default=10, type='int',
748 help="Number of devices to collect at one time")
749
751 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop)
752 if self.options.cycle:
753 d.addCallback(self.heartbeatCycle)
754
755
756 if __name__ == '__main__':
757 from Products.ZenRRD.zencommand import zencommand
758 z = zencommand()
759 z.run()
760
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0beta1 on Mon Oct 19 14:44:21 2009 | http://epydoc.sourceforge.net |