Package ZenRRD :: Module zentestcommand
[hide private]
[frames] | no frames]

Source Code for Module ZenRRD.zentestcommand

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__=''' ZenTestCommand 
 15   
 16  Test the run of a ZenCommand and print output 
 17   
 18  $Id$''' 
 19   
 20  __version__ = "$Revision$"[11:-2] 
 21   
 22  import os 
 23  import popen2 
 24  import fcntl 
 25  import time 
 26  import sys 
 27  import select 
 28  import logging 
 29  import signal 
 30  log = logging.getLogger("zen.zentestcommand") 
 31   
 32  import Globals 
 33  from Products.ZenUtils.ZenScriptBase import ZenScriptBase 
 34   
 35   
36 -class TestRunner(ZenScriptBase):
37
38 - def __init__(self):
41
42 - def getCommand(self, devName=None, dsName=None):
43 if not devName: devName = self.options.devName 44 if not dsName: dsName = self.options.dsName 45 devices = self.dmd.getDmdRoot("Devices") 46 device = devices.findDevice(devName) 47 if not device: 48 self.write('Could not find device %s.' % devName) 49 sys.exit(1) 50 dataSource = None 51 for templ in device.getRRDTemplates(): 52 for ds in templ.getRRDDataSources('COMMAND'): 53 if ds.id==dsName: 54 dataSource = ds 55 break 56 if dataSource: break 57 if not dataSource: 58 self.write('No datasource %s applies to device %s.' % (dsName, 59 devName)) 60 sys.exit(1) 61 return dataSource.getCommand(device)
62
63 - def execute(self, cmd):
64 child = popen2.Popen4(cmd) 65 flags = fcntl.fcntl(child.fromchild, fcntl.F_GETFL) 66 fcntl.fcntl(child.fromchild, fcntl.F_SETFL, flags | os.O_NDELAY) 67 pollPeriod = 1 68 timeout = max(self.options.timeout, 1) 69 endtime = time.time() + timeout 70 firstPass = True 71 while time.time() < endtime and ( 72 firstPass or child.poll()==-1): 73 firstPass = False 74 r,w,e = select.select([child.fromchild],[],[],pollPeriod) 75 if r: 76 t = child.fromchild.read() 77 if t: 78 self.write(t) 79 if child.poll()==-1: 80 self.write('Command timed out') 81 os.kill(child.pid, signal.SIGKILL)
82
83 - def write(self, text):
84 print text
85
86 - def run(self):
87 device, dsName = self.options.device, self.options.dsName 88 if not (device and dsName): 89 self.write("Must provide a device and datasource.") 90 sys.exit(2) 91 d = self.getCommand(device, dsName) 92 self.execute(d)
93
94 - def buildOptions(self):
95 ZenScriptBase.buildOptions(self) 96 self.parser.add_option('-d', '--device', 97 dest="device", 98 help="Device on which to test command") 99 self.parser.add_option('--datasource', 100 dest="dsName", 101 help="COMMAND datasource to test") 102 self.parser.add_option('-t', '--timeout', 103 dest="timeout", default=1, type="int", 104 help="Command timeout")
105 106 107 if __name__=='__main__': 108 tr = TestRunner() 109 tr.run() 110