| Trees | Indices | Help |
|
|---|
|
|
1 ###########################################################################
2 #
3 # This program is part of Zenoss Core, an open source monitoring platform.
4 # Copyright (C) 2007, 2009, Zenoss Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License version 2 as published by
8 # the Free Software Foundation.
9 #
10 # For complete information please visit: http://www.zenoss.com/oss/
11 #
12 ###########################################################################
13
14 __doc__ = """ZenCommand
15
16 Run Command plugins periodically.
17
18 """
19
20 import time
21 from pprint import pformat
22 import logging
23 log = logging.getLogger("zen.zencommand")
24 from sets import Set
25
26 from twisted.internet import reactor, defer, error
27 from twisted.internet.protocol import ProcessProtocol
28 from twisted.python import failure
29 from twisted.spread import pb
30
31 import Globals
32 from Products.ZenUtils.Driver import drive, driveLater
33
34 from Products.ZenRRD.RRDDaemon import RRDDaemon
35 from Products.ZenRRD.RRDUtil import RRDUtil
36 from Products.DataCollector.SshClient import SshClient
37
38 from Products.ZenRRD.CommandParser import getParser, ParsedResults
39
40 MAX_CONNECTIONS = 256
41
42
43
52
53
55 "Cause an error on a deferred when it is taking too long to complete"
56
57 def _timeout(deferred, obj):
58 "took too long... call an errback"
59 deferred.errback(failure.Failure(TimeoutError(obj)))
60
61
62 def _cb(arg, timer):
63 "the command finished, possibly by timing out"
64 if not timer.called:
65 timer.cancel()
66 return arg
67
68
69 timer = reactor.callLater(seconds, _timeout, deferred, obj)
70 deferred.addBoth(_cb, timer)
71 return deferred
72
73
75 """
76 Provide deferred process execution
77 """
78 stopped = None
79 exitCode = None
80 output = ''
81
83 "Kick off the process: run it local"
84 log.debug('running %r' % cmd.command)
85
86 shell = '/bin/sh'
87 self.cmdline = (shell, '-c', 'exec %s' % cmd.command)
88 self.command = ' '.join(self.cmdline)
89 log.debug('cmd line: %r' % self.command)
90 reactor.spawnProcess(self, shell, self.cmdline, env=None)
91
92 d = Timeout(defer.Deferred(), cmd.deviceConfig.commandTimeout, cmd)
93 self.stopped = d
94 self.stopped.addErrback(self.timeout)
95 return d
96
98 "Kill a process if it takes too long"
99 try:
100 self.transport.signalProcess('KILL')
101 except error.ProcessExitedAlready:
102 log.debug("Command already exited: %s" % self.command)
103 return value
104
105
109
110
112 "notify the starter that their process is complete"
113 self.exitCode = reason.value.exitCode
114 log.debug('Received exit code: %s' % self.exitCode)
115 log.debug('Command: %r' % self.command)
116 log.debug('Output: %r' % self.output)
117
118 if self.stopped:
119 d, self.stopped = self.stopped, None
120 if not d.called:
121 d.callback(self)
122
123
125 """
126 Connection to SSH server at the remote device
127 """
128
132
134 "Run a command against the server"
135 d = defer.Deferred()
136 self.defers[command] = d
137 SshClient.addCommand(self, command)
138 return d
139
140
142 "Forward the results of the command execution to the starter"
143 SshClient.addResult(self, command, data, code)
144 d = self.defers.pop(command)
145 if not d.called:
146 d.callback((data, code))
147
148
152
153
155 "We don't need to track commands/results when they complete"
156 SshClient.clientFinished(self)
157 self.commands = []
158 self.results = []
159
160
162 """
163 Cache all the Ssh connections so they can be managed
164 """
165
168
169
171 "Make a new SSH connection if there isn't one available"
172 dc = cmd.deviceConfig
173 result = self.pool.get(dc.device, None)
174 if result is None:
175 log.debug("Creating connection to %s", dc.device)
176 options = Options(dc.username, dc.password,
177 dc.loginTimeout, dc.commandTimeout,
178 dc.keyPath, dc.concurrentSessions)
179 # New param KeyPath
180 result = MySshClient(dc.device, dc.ipAddress, dc.port,
181 options=options)
182 result.run()
183 self.pool[dc.device] = result
184 return result
185
186
188 "close the SSH connection to a device, if it exists"
189 c = self.pool.get(device, None)
190 if c:
191 log.debug("Closing connection to %s", device)
192 if c.transport:
193 c.transport.loseConnection()
194 del self.pool[device]
195
196
200
201
203 "reduce the number of connections using the schedule for guidance"
204 # compute device list in order of next use
205 devices = []
206 for c in schedule:
207 device = c.deviceConfig.device
208 if device not in devices:
209 devices.append(device)
210 # close as many devices as needed
211 while devices and len(self.pool) > MAX_CONNECTIONS:
212 self._close(devices.pop())
213
214
216 """
217 Run a single command across a cached SSH connection
218 """
219 exitCode = None
220 output = None
221
224
225
227 "Initiate a command on the remote device"
228 self.defer = defer.Deferred()
229 c = self.pool.get(cmd)
230 try:
231 d = Timeout(c.addCommand(cmd.command),
232 cmd.deviceConfig.commandTimeout,
233 cmd)
234 except Exception, ex:
235 log.warning('Error starting command: %s', ex)
236 self.pool.close(cmd)
237 return defer.fail(ex)
238 d.addErrback(self.timeout)
239 d.addBoth(self.processEnded)
240 return d
241
242
244 "Deal with slow executing command/connection (close it)"
245 cmd, = arg.value.args
246 # we could send a kill signal, but then we would need to track
247 # the command channel to send it to: just close the connection
248 self.pool.close(cmd)
249 return arg
250
251
258
259
261 lastChange = 0.
262 device = ''
263 ipAddress = ''
264 port = 0
265 username = ''
266 password = ''
267 loginTimeout = 0.
268 commandTimeout = 0.
269 keyPath = ''
270 pb.setUnjellyableForClass(DeviceConfig, DeviceConfig)
271
272
274 id = ''
275 component = ''
276 rrdPath = ''
277 rrdType = None
278 rrdCreateCommand = ''
279 rrdMin = None
280 rrdMax = None
281
283 self.data = {}
284
287
288 pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)
289
291 """
292 Holds the config of every command to be run
293 """
294 command = None
295 useSsh = False
296 cycleTime = None
297 eventClass = None
298 eventKey = None
299 severity = 3
300 lastStart = 0
301 lastStop = 0
302 result = None
303
304
307
310
311
313 cmd, args = (self.command + ' ').split(' ', 1)
314 cmd = cmd.split('/')[-1]
315 return '%s %s' % (cmd, args)
316
317
319 if self.running():
320 return self.lastStart + self.cycleTime
321 return self.lastStop + self.cycleTime
322
323
325 if self.useSsh:
326 pr = SshRunner(pool)
327 else:
328 pr = ProcessRunner()
329 d = pr.start(self)
330 self.lastStart = time.time()
331 log.debug('Process %s started' % self.name())
332 d.addBoth(self.processEnded)
333 return d
334
335
337 self.result = pr
338 self.lastStop = time.time()
339 if not isinstance(pr, failure.Failure):
340 log.debug('Process %s stopped (%s), %.2f seconds elapsed' % (
341 self.name(),
342 pr.exitCode,
343 self.lastStop - self.lastStart))
344 return self
345 return pr
346
347
349 self.deviceConfig = deviceConfig
350 self.useSsh = cfg.useSsh
351 self.cycleTime = max(cfg.cycleTime, 1)
352 self.eventKey = cfg.eventKey
353 self.eventClass = cfg.eventClass
354 self.severity = cfg.severity
355 self.command = str(cfg.command)
356 self.points = cfg.points
357 return self
358
360 # fetch datapoint name from filename path and add it to the event key
361 return self.eventKey + '|' + point.rrdPath.split('/')[-1]
362
367
368 pb.setUnjellyableForClass(Cmd, Cmd)
369
371 loginTries=1
372 searchPath=''
373 existenceTest=None
374
375 - def __init__(self, username, password, loginTimeout, commandTimeout,
376 keyPath, concurrentSessions):
377 self.username = username
378 self.password = password
379 self.loginTimeout=loginTimeout
380 self.commandTimeout=commandTimeout
381 self.keyPath = keyPath
382 self.concurrentSessions = concurrentSessions
383
384
386 """
387 Warn that the username is not set for device and the SSH command cannot be
388 executed.
389 """
390 if device not in suppressed:
391 summary = 'zCommandUsername is not set'
392 log.warning(summary + ' for %s' % device)
393 sendEvent(dict(device=device,
394 eventClass='/Cmd/Fail',
395 eventKey='zCommandUsername',
396 severity=4,
397 component='zencommand',
398 summary=summary))
399 msg = 'username not configured for %s. skipping %s.'
400 log.debug(msg % (device, sshCmd.command))
401
402
404 """
405 Go through the Cmd objects in config.commands and update the config on the
406 command with config. If currentDict has the command on it then use that
407 one, otherwise use the command from config. If the device does not have a
408 username set, then don't yield commands that use SSH.
409
410 Parameters:
411
412 config - one of the configurations that was returned by calling
413 getDataSourceCommands on zenhub.
414
415 currentDict - a dictionary that maps (device, command) to Cmd object
416 for the commands currently on zencommand's schedule.
417 """
418
419 suppressed = [] # log warning and send event once per device
420
421 for cmd in config.commands:
422
423 key = (config.device, cmd.command)
424
425 if not config.username and cmd.useSsh:
426 warnUsernameNotSet(config.device, cmd, sendEvent, suppressed)
427 if config.device not in suppressed:
428 suppressed.append(config.device)
429 if key in currentDict:
430 del currentDict[key]
431 continue
432
433 if key in currentDict: newCmd = currentDict.pop(key)
434 else : newCmd = cmd
435
436 yield newCmd.updateConfig(cmd, config)
437
438
440 """
441 Daemon code to schedule commands and run them.
442 """
443
444 initialServices = RRDDaemon.initialServices + ['CommandConfig']
445
447 RRDDaemon.__init__(self, 'zencommand')
448 self.schedule = []
449 self.timeout = None
450 self.pool = SshPool()
451 self.executed = 0
452
454 self.log.debug("zenhub has asked us to delete device %s" % doomed)
455 self.schedule = [c for c in self.schedule if c.deviceConfig.device != doomed]
456
458 self.log.debug("Configuration update from zenhub")
459 self.updateConfig([config], [config.device])
460
462 self.log.debug("zenhub sent updated device list %s" % devices)
463 updated = []
464 lastChanges = dict(devices) # map device name to last change
465 keep = []
466 for cmd in self.schedule:
467 if cmd.deviceConfig.device in lastChanges:
468 if cmd.lastChange > lastChanges[cmd.device]:
469 updated.append(cmd.deviceConfig.device)
470 keep.append(cmd)
471 else:
472 self.log.info("Removing all commands for %s", cmd.deviceConfig.device)
473 self.schedule = keep
474 if updated:
475 self.log.info("Fetching the config for %s", updated)
476 d = self.model().callRemote('getDataSourceCommands', devices)
477 d.addCallback(self.updateConfig, updated)
478 d.addErrback(self.error)
479
481 expected = Set(expected)
482 current = {}
483 for c in self.schedule:
484 if c.deviceConfig.device in expected:
485 current[c.deviceConfig.device,c.command] = c
486 # keep all the commands we didn't ask for
487 update = [c for c in self.schedule if c.deviceConfig.device not in expected]
488 for cfg in configs:
489 self.thresholds.updateForDevice(cfg.device, cfg.thresholds)
490 if self.options.device and self.options.device != cfg.device:
491 continue
492 update.extend(updateCommands(cfg, current, self.sendEvent))
493 for device, command in current.keys():
494 self.log.info("Deleting command %s from %s", device, command)
495 self.schedule = update
496 self.processSchedule()
497
499 "There is no master 'cycle' to send the hearbeat"
500 self.heartbeat()
501 reactor.callLater(self.heartbeatTimeout/3, self.heartbeatCycle)
502 events = []
503 events += self.rrdStats.gauge('schedule',
504 self.heartbeatTimeout,
505 len(self.schedule))
506 events += self.rrdStats.counter('commands',
507 self.heartbeatTimeout,
508 self.executed)
509 events += self.rrdStats.counter('dataPoints',
510 self.heartbeatTimeout,
511 self.rrd.dataPoints)
512 events += self.rrdStats.gauge('cyclePoints',
513 self.heartbeatTimeout,
514 self.rrd.endCycle())
515 self.sendEvents(events)
516
517
519 """
520 Run through the schedule and start anything that needs to be done.
521 Set a timer if we have nothing to do.
522 """
523 if not self.options.cycle:
524 for cmd in self.schedule:
525 if cmd.running() or cmd.lastStart == 0:
526 break
527 else:
528 self.stop()
529 return
530 try:
531 if self.timeout and not self.timeout.called:
532 self.timeout.cancel()
533 self.timeout = None
534 def compare(x, y):
535 return cmp(x.nextRun(), y.nextRun())
536 self.schedule.sort(compare)
537 self.pool.trimConnections(self.schedule)
538 earliest = None
539 running = 0
540 now = time.time()
541 for c in self.schedule:
542 if c.running():
543 running += 1
544
545 for c in self.schedule:
546 if running >= self.options.parallel:
547 break
548 if c.nextRun() <= now:
549 c.start(self.pool).addBoth(self.finished)
550 running += 1
551 else:
552 earliest = c.nextRun() - now
553 break
554
555 if earliest is not None:
556 self.log.debug("Next command in %d seconds", int(earliest))
557 self.timeout = reactor.callLater(max(1, earliest),
558 self.processSchedule)
559 except Exception, ex:
560 self.log.exception(ex)
561
562
564 self.executed += 1
565 if isinstance(cmd, failure.Failure):
566 self.error(cmd)
567 else:
568 self.parseResults(cmd)
569 self.processSchedule()
570
571
573 if isinstance(err.value, TimeoutError):
574 cmd, = err.value.args
575 dc = cmd.deviceConfig
576 msg = "Command timed out on device %s: %r" % (dc.device, cmd.command)
577 self.log.warning(msg)
578 self.sendEvent(dict(device=dc.device,
579 component="zencommand",
580 eventClass=cmd.eventClass,
581 eventKey=cmd.eventKey,
582 severity=cmd.severity,
583 summary=msg))
584 else:
585 self.log.exception(err.value)
586
588 """
589 Process the results of our command-line, send events
590 and check datapoints.
591
592 @param cmd: command
593 @type: cmd object
594 """
595 self.log.debug('The result of "%s" was "%r"', cmd.command, cmd.result.output)
596 results = ParsedResults()
597 try:
598 parser = getParser(cmd.parser)
599 except Exception, ex:
600 self.log.exception("Error loading parser %s" % cmd.parser)
601 import traceback
602 self.sendEvent(dict(device=cmd.deviceConfig.device,
603 summary="Error loading parser %s" % cmd.parser,
604 component="zencommand",
605 message=traceback.format_exc(),
606 agent="zencommand",
607 ))
608 return
609 parser.processResults(cmd, results)
610
611 for ev in results.events:
612 self.sendEvent(ev, device=cmd.deviceConfig.device)
613
614 for dp, value in results.values:
615 self.log.debug("Storing %s = %s into %s" % (dp.id, value, dp.rrdPath))
616 value = self.rrd.save(dp.rrdPath,
617 value,
618 dp.rrdType,
619 dp.rrdCreateCommand,
620 cmd.cycleTime,
621 dp.rrdMin,
622 dp.rrdMax)
623 self.log.debug("RRD save result: %s" % value)
624 for ev in self.thresholds.check(dp.rrdPath, time.time(), value):
625 eventKey = cmd.getEventKey(dp)
626 if 'eventKey' in ev:
627 ev['eventKey'] = '%s|%s' % (eventKey, ev['eventKey'])
628 else:
629 ev['eventKey'] = eventKey
630 ev['component'] = dp.component
631 self.sendEvent(ev)
632
634 def doFetchConfig(driver):
635 try:
636 now = time.time()
637
638 yield self.model().callRemote('propertyItems')
639 self.setPropertyItems(driver.next())
640
641 yield self.model().callRemote('getDefaultRRDCreateCommand')
642 createCommand = driver.next()
643
644 yield self.model().callRemote('getThresholdClasses')
645 self.remote_updateThresholdClasses(driver.next())
646
647
648 yield self.model().callRemote('getCollectorThresholds')
649 self.rrdStats.config(self.options.monitor,
650 self.name,
651 driver.next(),
652 createCommand)
653
654 devices = []
655 if self.options.device:
656 devices = [self.options.device]
657 yield self.model().callRemote('getDataSourceCommands',
658 devices)
659 if not devices:
660 devices = list(Set([c.deviceConfig.device
661 for c in self.schedule]))
662 self.updateConfig(driver.next(), devices)
663
664 self.rrd = RRDUtil(createCommand, 60)
665
666 self.sendEvents(
667 self.rrdStats.gauge('configTime',
668 self.configCycleInterval * 60,
669 time.time() - now))
670
671 except Exception, ex:
672 self.log.exception(ex)
673 raise
674
675 return drive(doFetchConfig)
676
677
679 """
680 Fetch the configuration and return a deferred for its completion.
681 Also starts the config cycle
682 """
683 ex = None
684 try:
685 self.log.debug('Fetching configuration from zenhub')
686 yield self.fetchConfig()
687 driver.next()
688 self.log.debug('Finished config fetch')
689 except Exception, ex:
690 self.log.exception(ex)
691 driveLater(self.configCycleInterval * 60, self.start)
692 if ex:
693 raise ex
694
696 RRDDaemon.buildOptions(self)
697
698 self.parser.add_option('--parallel', dest='parallel',
699 default=10, type='int',
700 help="Number of devices to collect at one time")
701
703 d = drive(self.start).addCallbacks(self.processSchedule, self.errorStop)
704 if self.options.cycle:
705 d.addCallback(self.heartbeatCycle)
706
707
708 if __name__ == '__main__':
709 from Products.ZenRRD.zencommand import zencommand
710 z = zencommand()
711 z.run()
712
| Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0beta1 on Thu May 7 11:46:19 2009 | http://epydoc.sourceforge.net |