1
2
3
4
5
6
7
8
9
10
11
12
13
14 __doc__="""TelnetSession
15
16 TelnetSession is used by TelnetSession to issue commands to a machine
17 and return their output over the telnet protocol.
18
19 Device Tree Parameters are:
20
21 zTelnetLoginTries - number of times to try login default: 1
22 zTelnetLoginTimeout - timeout for expect statements during login default: 2
23 zTelnetPromptPause - pause used during prompt discovery default: 0.2
24 zTelnetCommandTimeout - default timeout when executing a command default: 5
25 zTelnetLoginRegex - regex to match the login prompt default: 'ogin:.$'
26 zTelnetPasswordRegex - regext to match the password prompt default: 'assword:.$'
27 zTelnetSuccessRegexList - list of regex that match good login prompts
28 default: [ '\$.$', '\#.$' ]
29
30 Other Parameters that are used by both TelnetSession and SshTransport:
31 zCommandPathList - list of path to check for a command
32 zCommandExistanceCheck - shell command issued to look for executible
33 must echo succ if executible is found
34 default: test -f executible
35
36 $Id: TelnetSession.py,v 1.5 2004/02/14 19:11:00 edahl Exp $"""
37
38 __version__ = "$Revision: 1.5 $"[11:-2]
39
40 import time
41 import telnetlib
42 import re
43 import os.path
44
45 import logging
46
47 from Exceptions import NoValidConnection, LoginFailed, CommandTimeout
48 from Exceptions import CommandNotFound
49
50
52
53 - def __init__(self, searchPath=[], existenceCheck='test -f %s',
54 options=None, context=None, log=None):
55
56 self.searchPath = searchPath
57 self.existenceCheck = existenceCheck
58
59 defaultLoginTries = 1
60 defaultLoginTimeout = 2
61 defaultPromptPause = 0.2
62 defaultCommandTimeout = 5
63 defaultLoginRegex = 'ogin:.$'
64 defaultPasswordRegex = 'assword:.$'
65 defaultSuccessRegexList = [ '\$.$', '\#.$' ]
66
67 if options and options.loginTries:
68 defaultLoginTries = options.loginTries
69
70 if options and options.loginTimeout:
71 defaultLoginTimeout = options.loginTimeout
72
73 if options and options.promptPause:
74 defaultPromptPause = options.promptPause
75
76 if options and options.commandTimeout:
77 defaultCommandTimeout = options.commandTimeout
78
79 if options and options.loginRegex:
80 defaultLoginRegex = options.loginRegex
81
82 if options and options.passwordRegex:
83 defaultPasswordRegex = options.passwordRegex
84
85
86 if context:
87 self.loginTries = getattr(self,
88 'zTelnetLoginTries', defaultLoginTries)
89 self.loginTimeout = getattr(self,
90 'zTelnetLoginTimeout', defaultLoginTimeout)
91 self.promptPause = getattr(self,
92 'zTelnetPromptPause', defaultPromptPause)
93 self.commandTimeout = getattr(self,
94 'zTelnetCommandTimeout', defaultCommandTimeout)
95 self.loginRegex = getattr(self,
96 'zTelnetLoginRegex', defaultLoginRegex)
97 self.passwordRegex = getattr(self,
98 'zTelnetPasswordRegex', defaultPasswordRegex)
99 self.successRegexList = getattr(self,
100 'zTelnetSuccessRegexList', defaultSuccessRegexList)
101 else:
102 self.loginTries = defaultLoginTries
103 self.loginTimeout = defaultLoginTimeout
104 self.promptPause = defaultPromptPause
105 self.commandTimeout = defaultCommandTimeout
106 self.loginRegex = defaultLoginRegex
107 self.passwordRegex = defaultPasswordRegex
108 self.successRegexList = defaultSuccessRegexList
109 self.tn = None
110 self.cmdprompt = ""
111 self.hostname = ""
112
113 if log:
114 self.log = log
115 else:
116 logging.basicConfig()
117 self.log = logging.getLogger(self.__class__.__name__)
118 if options: self.log.setLevel(options.logseverity)
119
120
121 - def check(self, hostname, timeout=1):
122 "check to see if a device supports telnet"
123 from telnetlib import Telnet
124 import socket
125 try:
126 tn = Telnet(hostname)
127 index, match, data = tn.expect([self.loginRegex,], timeout)
128 tn.close()
129 if index == 0: return 1
130 except socket.error:
131 return 0
132
133
134 - def connect(self, hostname, username, password):
135 "login to a machine using telnet"
136 self.hostname = hostname
137 self.tn = telnetlib.Telnet(hostname)
138 loginTries = self.loginTries
139 while 1:
140 self.tn.expect([self.loginRegex,], self.loginTimeout)
141 self.tn.write(username + "\n")
142 self.tn.expect([self.passwordRegex,], self.loginTimeout)
143 self.tn.write(password + "\n")
144 succTest = [self.loginRegex, self.passwordRegex,]
145 succTest.extend(self.successRegexList)
146 index, match, text = self.tn.expect(succTest, self.loginTimeout)
147 if index < 2:
148 self.log.debug("login failed text returned = '%s'" % text)
149 if loginTries <= 1:
150 raise LoginFailed, "login to %s failed" % hostname
151 else:
152 loginTries -= 1
153 else:
154 self.log.info("telnet connection open to device %s" % hostname)
155 break
156 self.guessCommandPrompt()
157
158
167
168
183
184
186 "close the telnet session"
187 self.tn.write("exit\n")
188 self.cmdprompt = ""
189 self.log.info("telnet connection to device %s closed" % self.hostname)
190
191
193 "figure out the command cmdprompt string by issuing a bunch of \ns"
194 self.tn.read_very_eager()
195 p1 = ""
196 p2 = ""
197 i = 8
198 while i:
199 self.tn.write("\n")
200 time.sleep(self.promptPause)
201 p1 = self.tn.read_very_eager()
202 if p1 == p2: break
203 p2 = p1
204 p1 = ""
205 i -= 1
206 if p1:
207 self.cmdprompt = p1
208 self.log.debug("found cmdprompt = '%s'" % self.cmdprompt)
209 else:
210 raise NoValidConnection, "no command prompt found"
211
212
226
227
228
230 import getpass
231 import os
232 import sys
233
234 from optparse import OptionParser
235
236 usage = "%prog [options] hostname command"
237 parser = OptionParser(usage=usage,
238 version="%prog " + __version__)
239
240 if os.environ.has_key('USER'):
241 username = os.environ['USER']
242 else:
243 username = ''
244 parser.add_option('-u', '--user',
245 dest='username',
246 default=username,
247 help='Login username')
248 parser.add_option('-p', '--password',
249 dest='password',
250 default='',
251 help='Login password')
252 parser.add_option('-t', '--loginTries',
253 dest='loginTries',
254 type = 'int',
255 help='number of times to try login')
256 parser.add_option('-l', '--loginTimeout',
257 dest='loginTimeout',
258 type = 'float',
259 help='timeout login expect statments')
260 parser.add_option('-P', '--promptPause',
261 dest='promptPause',
262 type = 'float',
263 help='time to pause in prompt discovery loop')
264 parser.add_option('-c', '--commandTimeout',
265 dest='commandTimeout',
266 type = 'float',
267 help='timeout when issuing a command')
268 parser.add_option('-L', '--loginRegex',
269 dest='loginRegex',
270 help='regex that will find the login prompt')
271 parser.add_option('-w', '--passwordRegex',
272 dest='passwordRegex',
273 help='regex that will find the password prompt')
274 parser.add_option('-s', '--searchPath',
275 dest='searchPath',
276 default="",
277 help='Path to use when looking for commands')
278 parser.add_option('-v', '--logseverity',
279 dest='logseverity',
280 default=logging.INFO,
281 type='int',
282 help='Logging severity threshold')
283
284
285 options, args = parser.parse_args()
286 tt = TelnetSession(searchPath=options.searchPath,options=options)
287 if len(args) < 2:
288 parser.print_help()
289 sys.exit(1)
290 hostname = args[0]
291 command = args[1]
292 if not options.password:
293 password = getpass.getpass()
294 else: password = options.password
295 tt.connect(hostname, options.username, password)
296 import pprint
297 pprint.pprint(tt.execute(command, 1.5))
298 tt.disconnect()
299
300 if __name__ == '__main__':
301 main()
302