Package Products :: Package ZenEvents :: Module xtest
[hide private]
[frames] | no frames]

Source Code for Module Products.ZenEvents.xtest

  1  #! /usr/bin/env python  
  2  ############################################################################## 
  3  #  
  4  # Copyright (C) Zenoss, Inc. 2007, all rights reserved. 
  5  #  
  6  # This content is made available according to terms specified in 
  7  # License.zenoss under the directory where your Zenoss product is installed. 
  8  #  
  9  ############################################################################## 
 10   
 11   
 12  __doc__='''xtest.py 
 13   
 14  Sends test events to zenoss via xml-rpc. 
 15  Events can be specified on the command line or read from a file. 
 16  ''' 
 17   
 18  import Globals 
 19  from Products.ZenUtils.CmdBase import CmdBase 
 20  import xmlrpclib 
 21  import time 
 22  import sys 
 23  from Products.ZenEvents.ZenEventClasses import Status_Perf 
 24   
 25  # Input files must be in python format and contain a list named 'events' 
 26  # that contains one dictionary per test event. 
 27  # Each of these dictionaries should have values for device, summary, component 
 28  # and severity. 
 29  #Example: 
 30  # 
 31  #events = [ 
 32  #    { 
 33  #        'device': 'Device1a', 
 34  #        'summary': 'This is the summary.', 
 35  #        'component': 'Some component', 
 36  #        'severity': 4, 
 37  #    }, 
 38  #    { 
 39  #        'device': 'Device2a', 
 40  #        'summary': 'This is the summary.', 
 41  #        'component': 'Some component', 
 42  #        'severity': 4, 
 43  #    }, 
 44  #] 
 45   
 46   
47 -class XTest(CmdBase):
48 49 # Sample event and corresponding clear event used by several methods. 50 sampleEvent = dict(device='Sample device', 51 summary='Test event at %s' % time.time(), 52 eventClass=Status_Perf, 53 severity=4, 54 component='Sample component') 55 sampleClear = sampleEvent.copy() 56 sampleClear.update(dict( 57 severity=0, 58 summary='Clear event')) 59 60
61 - def __init__(self, noopts=0):
62 CmdBase.__init__(self, noopts) 63 self.proxy = None
64 65
66 - def buildOptions(self):
67 """basic options setup sub classes can add more options here""" 68 CmdBase.buildOptions(self) 69 self.parser.add_option('--file', 70 dest="filepath",default=None, 71 help="file containing event details") 72 self.parser.add_option('--sample', 73 dest='dosample', default=False, 74 action='store_true', 75 help='Send sample event and clear event') 76 self.parser.add_option('-d', '--device', 77 dest="device",default='', 78 help="device to use for event") 79 self.parser.add_option('-s', '--summary', 80 dest="summary",default='', 81 help="summary to use for event") 82 self.parser.add_option('-c', '--component', 83 dest="component",default='', 84 help="component to use for event") 85 self.parser.add_option('-y', '--severity', 86 dest="severity",default=4, 87 type='int', 88 help="severity to use for event") 89 self.parser.add_option('--rpchost', 90 dest="rpchost",default='localhost', 91 help="host for xml-rpc request") 92 self.parser.add_option('--rpcport', 93 dest="rpcport",default='8081', 94 help="port for xml-rpc request")
95 96
97 - def parseEventsFile(self, filepath=None):
98 ''' Not much actual parsing going on here, just importing 99 the given file. 100 ''' 101 if not filepath: 102 filepath = self.options.filepath 103 args = {} 104 execfile(filepath, {}, args) 105 if 'events' in args: 106 events = args['events'] 107 else: 108 events = [] 109 sys.stderr.write('%s has no value for events\n' % filepath) 110 return events
111 112
113 - def getXmlRpcProxy(self):
114 ''' Returns xmlrpc proxy, creating on if the instance doesn't 115 already have one. 116 ''' 117 if not self.proxy: 118 self.proxy = xmlrpclib.ServerProxy( 119 'http://%s:%s/' % (self.options.rpchost, self.options.rpcport), 120 #verbose=1, 121 encoding='iso-8859-1') 122 return self.proxy
123 124
125 - def sendEvents(self, events):
126 ''' events is a list of dictionaries with details of events to send. 127 This sends those events via the xmlrpc proxy. 128 ''' 129 proxy = self.getXmlRpcProxy() 130 proxy.sendEvents(events)
131 132
133 - def sendSampleEvents(self, repeat=1):
134 ''' Sends the sample event and corresponding clear event. 135 Repeats this as many times as specified by repeat. 136 ''' 137 for i in range(repeat): 138 self.sendEvents([self.sampleEvent, self.sampleClear])
139 140
141 - def sendSampleWithDelayedClear(self, repeat=1, delay=30):
142 ''' Sends sample event then after a delay sends the clear event, 143 repeating as specified. 144 ''' 145 self.sendEvents([self.sampleEvent]) 146 time.sleep(30) 147 self.sendEvents([self.sampleClear])
148 149
150 - def sendEventsFromFile(self, filepath=None):
151 ''' Parse the given file (or the file specified at init) and 152 send the files. 153 ''' 154 events = self.parseEventsFile(filepath=filepath) 155 self.sendEvents(events)
156 157
158 -def main():
159 "performance test" 160 xt = XTest() 161 xt.sendSampleEvents(repeat=100)
162
163 -def coverage():
164 xt = XTest() 165 xt.sendSampleEvents() 166 xt.sendEvents([xt.sampleEvent]) 167 proxy = xt.getXmlRpcProxy() 168 issues = proxy.getDevicePingIssues() 169 for i in issues: 170 print i
171
172 -def simple():
173 xt = XTest() 174 xt.sendSmapleWithDelayedClear()
175 176 if __name__ == '__main__': 177 xt = XTest() 178 if xt.options.dosample: 179 xt.sendSampleEvents() 180 elif xt.options.filepath: 181 xt.sendEventsFromFile() 182 elif xt.options.device: 183 event = dict(device=xt.options.device, 184 summary=xt.options.summary, 185 component=xt.options.component, 186 severity=xt.options.severity) 187 xt.sendEvents([event]) 188 else: 189 xt.parser.print_help() 190