Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
rt-tester.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 #
3 # rt-mutex tester
4 #
5 # (C) 2006 Thomas Gleixner <[email protected]>
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License version 2 as
9 # published by the Free Software Foundation.
10 #
11 import os
12 import sys
13 import getopt
14 import shutil
15 import string
16 
17 # Globals
18 quiet = 0
19 test = 0
20 comments = 0
21 
22 sysfsprefix = "/sys/devices/system/rttest/rttest"
23 statusfile = "/status"
24 commandfile = "/command"
25 
26 # Command opcodes
27 cmd_opcodes = {
28  "schedother" : "1",
29  "schedfifo" : "2",
30  "lock" : "3",
31  "locknowait" : "4",
32  "lockint" : "5",
33  "lockintnowait" : "6",
34  "lockcont" : "7",
35  "unlock" : "8",
36  "signal" : "11",
37  "resetevent" : "98",
38  "reset" : "99",
39  }
40 
41 test_opcodes = {
42  "prioeq" : ["P" , "eq" , None],
43  "priolt" : ["P" , "lt" , None],
44  "priogt" : ["P" , "gt" , None],
45  "nprioeq" : ["N" , "eq" , None],
46  "npriolt" : ["N" , "lt" , None],
47  "npriogt" : ["N" , "gt" , None],
48  "unlocked" : ["M" , "eq" , 0],
49  "trylock" : ["M" , "eq" , 1],
50  "blocked" : ["M" , "eq" , 2],
51  "blockedwake" : ["M" , "eq" , 3],
52  "locked" : ["M" , "eq" , 4],
53  "opcodeeq" : ["O" , "eq" , None],
54  "opcodelt" : ["O" , "lt" , None],
55  "opcodegt" : ["O" , "gt" , None],
56  "eventeq" : ["E" , "eq" , None],
57  "eventlt" : ["E" , "lt" , None],
58  "eventgt" : ["E" , "gt" , None],
59  }
60 
61 # Print usage information
62 def usage():
63  print "rt-tester.py <-c -h -q -t> <testfile>"
64  print " -c display comments after first command"
65  print " -h help"
66  print " -q quiet mode"
67  print " -t test mode (syntax check)"
68  print " testfile: read test specification from testfile"
69  print " otherwise from stdin"
70  return
71 
72 # Print progress when not in quiet mode
73 def progress(str):
74  if not quiet:
75  print str
76 
77 # Analyse a status value
78 def analyse(val, top, arg):
79 
80  intval = int(val)
81 
82  if top[0] == "M":
83  intval = intval / (10 ** int(arg))
84  intval = intval % 10
85  argval = top[2]
86  elif top[0] == "O":
87  argval = int(cmd_opcodes.get(arg, arg))
88  else:
89  argval = int(arg)
90 
91  # progress("%d %s %d" %(intval, top[1], argval))
92 
93  if top[1] == "eq" and intval == argval:
94  return 1
95  if top[1] == "lt" and intval < argval:
96  return 1
97  if top[1] == "gt" and intval > argval:
98  return 1
99  return 0
100 
101 # Parse the commandline
102 try:
103  (options, arguments) = getopt.getopt(sys.argv[1:],'chqt')
104 except getopt.GetoptError, ex:
105  usage()
106  sys.exit(1)
107 
108 # Parse commandline options
109 for option, value in options:
110  if option == "-c":
111  comments = 1
112  elif option == "-q":
113  quiet = 1
114  elif option == "-t":
115  test = 1
116  elif option == '-h':
117  usage()
118  sys.exit(0)
119 
120 # Select the input source
121 if arguments:
122  try:
123  fd = open(arguments[0])
124  except Exception,ex:
125  sys.stderr.write("File not found %s\n" %(arguments[0]))
126  sys.exit(1)
127 else:
128  fd = sys.stdin
129 
130 linenr = 0
131 
132 # Read the test patterns
133 while 1:
134 
135  linenr = linenr + 1
136  line = fd.readline()
137  if not len(line):
138  break
139 
140  line = line.strip()
141  parts = line.split(":")
142 
143  if not parts or len(parts) < 1:
144  continue
145 
146  if len(parts[0]) == 0:
147  continue
148 
149  if parts[0].startswith("#"):
150  if comments > 1:
151  progress(line)
152  continue
153 
154  if comments == 1:
155  comments = 2
156 
157  progress(line)
158 
159  cmd = parts[0].strip().lower()
160  opc = parts[1].strip().lower()
161  tid = parts[2].strip()
162  dat = parts[3].strip()
163 
164  try:
165  # Test or wait for a status value
166  if cmd == "t" or cmd == "w":
167  testop = test_opcodes[opc]
168 
169  fname = "%s%s%s" %(sysfsprefix, tid, statusfile)
170  if test:
171  print fname
172  continue
173 
174  while 1:
175  query = 1
176  fsta = open(fname, 'r')
177  status = fsta.readline().strip()
178  fsta.close()
179  stat = status.split(",")
180  for s in stat:
181  s = s.strip()
182  if s.startswith(testop[0]):
183  # Separate status value
184  val = s[2:].strip()
185  query = analyse(val, testop, dat)
186  break
187  if query or cmd == "t":
188  break
189 
190  progress(" " + status)
191 
192  if not query:
193  sys.stderr.write("Test failed in line %d\n" %(linenr))
194  sys.exit(1)
195 
196  # Issue a command to the tester
197  elif cmd == "c":
198  cmdnr = cmd_opcodes[opc]
199  # Build command string and sys filename
200  cmdstr = "%s:%s" %(cmdnr, dat)
201  fname = "%s%s%s" %(sysfsprefix, tid, commandfile)
202  if test:
203  print fname
204  continue
205  fcmd = open(fname, 'w')
206  fcmd.write(cmdstr)
207  fcmd.close()
208 
209  except Exception,ex:
210  sys.stderr.write(str(ex))
211  sys.stderr.write("\nSyntax error in line %d\n" %(linenr))
212  if not test:
213  fd.close()
214  sys.exit(1)
215 
216 # Normal exit pass
217 print "Pass"
218 sys.exit(0)
219 
220