| Trees | Indices | Help |
|
|---|
|
|
1 ###########################################################################
2 #
3 # This program is part of Zenoss Core, an open source monitoring platform.
4 # Copyright (C) 2007, 2009, Zenoss Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License version 2 as published by
8 # the Free Software Foundation.
9 #
10 # For complete information please visit: http://www.zenoss.com/oss/
11 #
12 ###########################################################################
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33 from Products.ZenUtils.Utils import unused
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37 from Products.ZenEvents.ZenEventClasses import Clear
38 from Products.ZenRRD.CommandParser import ParsedResults
39
40 from Products.DataCollector import Plugins
41 unused(Plugins)
42 MAX_CONNECTIONS = 256
43
44
45
54
55
57 "Cause an error on a deferred when it is taking too long to complete"
58
59 def _timeout(deferred, obj):
60 "took too long... call an errback"
61 deferred.errback(failure.Failure(TimeoutError(obj)))
62
63
64 def _cb(arg, timer):
65 "the command finished, possibly by timing out"
66 if not timer.called:
67 timer.cancel()
68 return arg
69
70
71 timer = reactor.callLater(seconds, _timeout, deferred, obj)
72 deferred.addBoth(_cb, timer)
73 return deferred
74
75
77 """
78 Provide deferred process execution
79 """
80 stopped = None
81 exitCode = None
82 output = ''
83
85 "Kick off the process: run it local"
86 log.debug('running %r' % cmd.command)
87
88 shell = '/bin/sh'
89 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
90 self.command = ' '.join(self.cmdline)
91 log.debug('cmd line: %r' % self.command)
92 reactor.spawnProcess(self, shell, self.cmdline, env=None)
93
94 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
95 self.stopped = d
96 self.stopped.addErrback(self.timeout)
97 return d
98
100 "Kill a process if it takes too long"
101 try:
102 self.transport.signalProcess('KILL')
103 except error.ProcessExitedAlready:
104 log.debug("Command already exited: %s" % self.command)
105 return value
106
107
111
112
114 "notify the starter that their process is complete"
115 self.exitCode = reason.value.exitCode
116 log.debug('Received exit code: %s' % self.exitCode)
117 log.debug('Command: %r' % self.command)
118 log.debug('Output: %r' % self.output)
119
120 if self.stopped:
121 d, self.stopped = self.stopped, None
122 if not d.called:
123 d.callback(self)
124
125
127 """
128 Connection to SSH server at the remote device
129 """
130
134
136 "Run a command against the server"
137 d = defer.Deferred()
138 self.defers[command] = d
139 SshClient.addCommand(self, command)
140 return d
141
142
144 "Forward the results of the command execution to the starter"
145 SshClient.addResult(self, command, data, code)
146 d = self.defers.pop(command)
147 if not d.called:
148 d.callback((data, code))
149
150
154
155
157 "We don't need to track commands/results when they complete"
158 SshClient.clientFinished(self)
159 self.commands = []
160 self.results = []
161
162
164 """
165 Cache all the Ssh connections so they can be managed
166 """
167
170
171
173 "Make a new SSH connection if there isn't one available"
174 dc = cmd.deviceConfig
175 result = self.pool.get(dc.device, None)
176 if result is None:
177 log.debug("Creating connection to %s", dc.device)
178 options = Options(dc.username, dc.password,
179 dc.loginTimeout, dc.commandTimeout,
180 dc.keyPath, dc.concurrentSessions)
181 # New param KeyPath
182 result = MySshClient(dc.device, dc.ipAddress, dc.port,
183 options=options)
184 result.run()
185 self.pool[dc.device] = result
186 return result
187
188
190 "close the SSH connection to a device, if it exists"
191 c = self.pool.get(device, None)
192 if c:
193 log.debug("Closing connection to %s", device)
194 if c.transport:
195 c.transport.loseConnection()
196 del self.pool[device]
197
198
202
203
205 "reduce the number of connections using the schedule for guidance"
206 # compute device list in order of next use
207 devices = []
208 for c in schedule:
209 device = c.deviceConfig.device
210 if device not in devices:
211 devices.append(device)
212 # close as many devices as needed
213 while devices and len(self.pool) > MAX_CONNECTIONS:
214 self._close(devices.pop())
215
216
218 """
219 Run a single command across a cached SSH connection
220 """
221 exitCode = None
222 output = None
223
226
227
229 "Initiate a command on the remote device"
230 self.defer = defer.Deferred()
231 c = self.pool.get(cmd)
232 try:
233 d = Timeout(c.addCommand(cmd.command),
234 cmd.deviceConfig.commandTimeout,
235 cmd)
236 except Exception, ex:
237 log.warning('Error starting command: %s', ex)
238 self.pool.close(cmd)
239 return defer.fail(ex)
240 d.addErrback(self.timeout)
241 d.addBoth(self.processEnded)
242 return d
243
244
246 "Deal with slow executing command/connection (close it)"
247 cmd, = arg.value.args
248 # we could send a kill signal, but then we would need to track
249 # the command channel to send it to: just close the connection
250 self.pool.close(cmd)
251 return arg
252
253
260
261
263 lastChange = 0.
264 device = ''
265 ipAddress = ''
266 port = 0
267 username = ''
268 password = ''
269 loginTimeout = 0.
270 commandTimeout = 0.
271 keyPath = ''
272 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
273
274
276 id = ''
277 component = ''
278 rrdPath = ''
279 rrdType = None
280 rrdCreateCommand = ''
281 rrdMin = None
282 rrdMax = None
283
285 self.data = {}
286
289
290 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
291
293 """
294 Holds the config of every command to be run
295 """
296 command = None
297 useSsh = False
298 cycleTime = None
299 eventClass = None
300 eventKey = None
301 severity = 3
302 lastStart = 0
303 lastStop = 0
304 result = None
305
306
309
312
313
315 cmd, args = (self.command + ' ').split(' ', 1)
316 cmd = cmd.split('/')[-1]
317 return '%s %s' % (cmd, args)
318
319
321 if self.running():
322 return self.lastStart + self.cycleTime
323 return self.lastStop + self.cycleTime
324
325
327 if self.useSsh:
328 pr = SshRunner(pool)
329 else:
330 pr = ProcessRunner()
331 d = pr.start(self)
332 self.lastStart = time.time()
333 log.debug('Process %s started' % self.name())
334 d.addBoth(self.processEnded)
335 return d
336
337
339 self.result = pr
340 self.lastStop = time.time()
341
342 # Check for a condition that could cause zencommand to stop cycling.
343 # http://dev.zenoss.org/trac/ticket/4936
344 if self.lastStop < self.lastStart:
345 log.debug('System clock went back?')
346 self.lastStop = self.lastStart
347
348 if not isinstance(pr, failure.Failure):
349 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % (
350 self.name(),
351 pr.exitCode,
352 self.lastStop - self.lastStart))
353 return self
354 return pr
355
356
358 self.deviceConfig = deviceConfig
359 self.useSsh = cfg.useSsh
360 self.cycleTime = max(cfg.cycleTime, 1)
361 self.eventKey = cfg.eventKey
362 self.eventClass = cfg.eventClass
363 self.severity = cfg.severity
364 self.command = str(cfg.command)
365 self.points = cfg.points
366 return self
367
369 # fetch datapoint name from filename path and add it to the event key
370 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
371
376
377 pb.setUnjellyableForClass(Cmd, Cmd)
378
380 loginTries=1
381 searchPath=''
382 existenceTest=None
383
384 - def __init__(self, username, password, loginTimeout, commandTimeout,
385 keyPath, concurrentSessions):
386 self.username = username
387 self.password = password
388 self.loginTimeout=loginTimeout
389 self.commandTimeout=commandTimeout
390 self.keyPath = keyPath
391 self.concurrentSessions = concurrentSessions
392
393
395 """
396 Warn that the username is not set for device and the SSH command cannot be
397 executed.
398 """
399 if device not in suppressed:
400 summary = 'zCommandUsername is not set'
401 log.warning(summary + ' for %s' % device)
402 sendEvent(dict(device=device,
403 eventClass='/Cmd/Fail',
404 eventKey='zCommandUsername',
405 severity=4,
406 component='zencommand',
407 summary=summary))
408 msg = 'username not configured for %s. skipping %s.'
409 log.debug(msg % (device, sshCmd.command))
410
411
413 """
414 Go through the Cmd objects in config.commands and update the config on the
415 command with config. If currentDict has the command on it then use that
416 one, otherwise use the command from config. If the device does not have a
417 username set, then don't yield commands that use SSH.
418
419 Parameters:
420
421 config - one of the configurations that was returned by calling
422 getDataSourceCommands on zenhub.
423
424 currentDict - a dictionary that maps (device, command) to Cmd object
425 for the commands currently on zencommand's schedule.
426 """
427
428 suppressed = [] # log warning and send event once per device
429
430 for cmd in config.commands:
431
432 key = (config.device, cmd.command)
433
434 if not config.username and cmd.useSsh:
435 warnUsernameNotSet(config.device, cmd, sendEvent, suppressed)
436 if config.device not in suppressed:
437 suppressed.append(config.device)
438 if key in currentDict:
439 del currentDict[key]
440 continue
441
442 if key in currentDict: newCmd = currentDict.pop(key)
443 else : newCmd = cmd
444
445 yield newCmd.updateConfig(cmd, config)
446
447
449 """
450 Daemon code to schedule commands and run them.
451 """
452
453 initialServices = RRDDaemon.initialServices + ['CommandConfig']
454
456 RRDDaemon.__init__(self, 'zencommand')
457 self.schedule = []
458 self.timeout = None
459 self.pool = SshPool()
460 self.executed = 0
461
463 self.log.debug("zenhub has asked us to delete device %s" % doomed)
464 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
465
467 self.log.debug("Configuration update from zenhub")
468 self.updateConfig([config], [config.device])
469
471 self.log.debug("zenhub sent updated device list %s" % devices)
472 updated = []
473 lastChanges = dict(devices) # map device name to last change
474 keep = []
475 for cmd in self.schedule:
476 if cmd.deviceConfig.device in lastChanges:
477 if cmd.lastChange > lastChanges[cmd.device]:
478 updated.append(cmd.deviceConfig.device)
479 keep.append(cmd)
480 else:
481 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
482 self.schedule = keep
483 if updated:
484 self.log.info("Fetching the config for %s", updated)
485 d = self.model().callRemote('getDataSourceCommands', devices)
486 d.addCallback(self.updateConfig, updated)
487 d.addErrback(self.error)
488
490 expected = Set(expected)
491 current = {}
492 for c in self.schedule:
493 if c.deviceConfig.device in expected:
494 current[c.deviceConfig.device,c.command] = c
495 # keep all the commands we didn't ask for
496 update = [c for c in self.schedule if c.deviceConfig.device not in expected]
497 for cfg in configs:
498 self.thresholds.updateForDevice(cfg.device, cfg.thresholds)
499 if self.options.device and self.options.device != cfg.device:
500 continue
501 update.extend(updateCommands(cfg, current, self.sendEvent))
502 for device, command in current.keys():
503 self.log.info("Deleting command %s from %s", device, command)
504 self.schedule = update
505 self.processSchedule()
506
508 "There is no master 'cycle' to send the hearbeat"
509 self.heartbeat()
510 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle)
511 events = []
512 events += self.rrdStats.gauge('schedule',
513 self.heartbeatTimeout,
514 len(self.schedule))
515 events += self.rrdStats.counter('commands',
516 self.heartbeatTimeout,
517 self.executed)
518 events += self.rrdStats.counter('dataPoints',
519 self.heartbeatTimeout,
520 self.rrd.dataPoints)
521 events += self.rrdStats.gauge('cyclePoints',
522 self.heartbeatTimeout,
523 self.rrd.endCycle())
524 self.sendEvents(events)
525
526
528 """
529 Run through the schedule and start anything that needs to be done.
530 Set a timer if we have nothing to do.
531 """
532 if not self.options.cycle:
533 for cmd in self.schedule:
534 if cmd.running() or cmd.lastStart == 0:
535 break
536 else:
537 self.stop()
538 return
539 try:
540 if self.timeout and not self.timeout.called:
541 self.timeout.cancel()
542 self.timeout = None
543 def compare(x, y):
544 return cmp(x.nextRun(), y.nextRun())
545 self.schedule.sort(compare)
546 self.pool.trimConnections(self.schedule)
547 earliest = None
548 running = 0
549 now = time.time()
550 for c in self.schedule:
551 if c.running():
552 running += 1
553
554 for c in self.schedule:
555 if running >= self.options.parallel:
556 break
557 if c.nextRun() <= now:
558 c.start(self.pool).addBoth(self.finished)
559 running += 1
560 else:
561 earliest = c.nextRun() - now
562 break
563
564 if earliest is not None:
565 self.log.debug("Next command in %d seconds", int(earliest))
566 self.timeout = reactor.callLater(max(1, earliest),
567 self.processSchedule)
568 except Exception, ex:
569 self.log.exception(ex)
570
571
573 """
574 Send an event using the info in the Cmd object.
575 """
576 self.sendEvent(dict(device=cmd.deviceConfig.device,
577 component=cmd.component,
578 eventClass=cmd.eventClass,
579 eventKey=cmd.eventKey,
580 severity=severity,
581 summary=summary))
582
584 """
585 The command has finished. cmdOrErr is either a Cmd instance or a
586 twisted failure.
587 """
588 self.executed += 1
589 if isinstance(cmdOrErr, failure.Failure):
590 self.error(cmdOrErr)
591 else:
592 cmd = cmdOrErr
593 self.sendCmdEvent(cmd, Clear, "Clear")
594 self.parseResults(cmd)
595 self.processSchedule()
596
598 """
599 The finished method indicated that there was a failure. This method
600 is also called by RRDDaemon.errorStop.
601 """
602 if isinstance(err.value, TimeoutError):
603 cmd, = err.value.args
604 msg = "Command timed out on device %s: %r" % (
605 cmd.deviceConfig.device, cmd.command)
606 self.log.warning(msg)
607 self.sendCmdEvent(cmd, cmd.severity, msg)
608 else:
609 self.log.exception(err.value)
610
612 """
613 Process the results of our command-line, send events
614 and check datapoints.
615
616 @param cmd: command
617 @type: cmd object
618 """
619 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
620 results = ParsedResults()
621 try:
622 parser = cmd.parser.create()
623 except Exception, ex:
624 self.log.exception("Error loading parser %s" % cmd.parser)
625 import traceback
626 self.sendEvent(dict(device=cmd.deviceConfig.device,
627 summary="Error loading parser %s" % cmd.parser,
628 component="zencommand",
629 message=traceback.format_exc(),
630 agent="zencommand",
631 ))
632 return
633 parser.processResults(cmd, results)
634
635 for ev in results.events:
636 self.sendEvent(ev, device=cmd.deviceConfig.device)
637
638 for dp, value in results.values:
639 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
640 value = self.rrd.save(dp.rrdPath,
641 value,
642 dp.rrdType,
643 dp.rrdCreateCommand,
644 cmd.cycleTime,
645 dp.rrdMin,
646 dp.rrdMax)
647 self.log.debug("RRD save result: %s" % value)
648 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
649 eventKey = cmd.getEventKey(dp)
650 if 'eventKey' in ev:
651 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
652 else:
653 ev['eventKey'] = eventKey
654 ev['component'] = dp.component
655 self.sendEvent(ev)
656
658 def doFetchConfig(driver):
659 try:
660 now = time.time()
661
662 yield self.model().callRemote('propertyItems')
663 self.setPropertyItems(driver.next())
664
665 yield self.model().callRemote('getDefaultRRDCreateCommand')
666 createCommand = driver.next()
667
668 yield self.model().callRemote('getThresholdClasses')
669 self.remote_updateThresholdClasses(driver.next())
670
671
672 yield self.model().callRemote('getCollectorThresholds')
673 self.rrdStats.config(self.options.monitor,
674 self.name,
675 driver.next(),
676 createCommand)
677
678 devices = []
679 if self.options.device:
680 devices = [self.options.device]
681 yield self.model().callRemote('getDataSourceCommands',
682 devices)
683 if not devices:
684 devices = list(Set([c.deviceConfig.device
685 for c in self.schedule]))
686 self.updateConfig(driver.next(), devices)
687
688 self.rrd = RRDUtil(createCommand, 60)
689
690 self.sendEvents(
691 self.rrdStats.gauge('configTime',
692 self.configCycleInterval * 60,
693 time.time() - now))
694
695 except Exception, ex:
696 self.log.exception(ex)
697 raise
698
699 return drive(doFetchConfig)
700
701
703 """
704 Fetch the configuration and return a deferred for its completion.
705 Also starts the config cycle
706 """
707 ex = None
708 try:
709 self.log.debug('Fetching configuration from zenhub')
710 yield self.fetchConfig()
711 driver.next()
712 self.log.debug('Finished config fetch')
713 except Exception, ex:
714 self.log.exception(ex)
715 driveLater(self.configCycleInterval * 60, self.start)
716 if ex:
717 raise ex
718
720 RRDDaemon.buildOptions(self)
721
722 self.parser.add_option('--parallel', dest='parallel',
723 default=10, type='int',
724 help="Number of devices to collect at one time")
725
727 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop)
728 if self.options.cycle:
729 d.addCallback(self.heartbeatCycle)
730
731
732 if __name__ == '__main__':
733 from Products.ZenRRD.zencommand import zencommand
734 z = zencommand()
735 z.run()
736
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0beta1 on Fri Aug 28 03:05:20 2009 | http://epydoc.sourceforge.net |