Package DataCollector :: Module SshClient
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.SshClient

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__="""SshClient runs commands on a remote box using SSH and 
 15  returns their results. 
 16   
 17  See http://twistedmatrix.com/trac/wiki/Documentation for Twisted documentation, 
 18  specifically documentation on 'conch' (Twisted's SSH protocol support). 
 19  """ 
 20   
 21  import os 
 22  import sys 
 23  import logging 
 24  log = logging.getLogger("zen.SshClient") 
 25   
 26  import Globals 
 27   
 28  from twisted.conch.ssh import transport, userauth, connection 
 29  from twisted.conch.ssh import common, keys, channel 
 30  from twisted.internet import defer, reactor 
 31  from Products.ZenEvents import Event 
 32  from Products.ZenUtils.Utils import getExitMessage 
 33   
 34  from Exceptions import * 
 35   
 36  import CollectorClient 
 37   
 38  # NB: Most messages returned back from Twisted are Unicode. 
 39  #     Expect to use str() to convert to ASCII before dumping out. :) 
 40   
 41   
42 -def sendEvent( self, message="", device='', severity=Event.Error ):
43 """ 44 Shortcut version of sendEvent() 45 46 @param message: message to send in Zenoss event 47 @type message: string 48 @param device: hostname of device to which this event is associated 49 @type device: string 50 @param severity: Zenoss severity from Products.ZenEvents 51 @type severity: integer 52 """ 53 54 # Parse out the daemon's name 55 component= os.path.basename( sys.argv[0] ).replace( '.py', '' ) 56 57 def hasattr_path( object_root, path ): 58 """ 59 The regular hasattr() only works on one component, 60 not multiples. 61 62 @param object_root: object to start searching for path 63 @type object_root: object 64 @param path: path to func or variable (eg "conn.factory" ) 65 @type path: string 66 @return: is object_root.path sane? 67 @rtype: boolean 68 """ 69 obj = object_root 70 for chunk in path.split('.'): 71 obj= getattr( obj, chunk, None ) 72 if obj is None: 73 return False 74 return True
75 76 # ... and the device's name (as known by Zenoss) 77 if device == '': 78 if hasattr_path( self, "factory.hostname" ): 79 device= self.factory.hostname 80 81 elif hasattr_path( self, "conn.factory.hostname" ): 82 device= self.conn.factory.hostname 83 84 else: 85 log.debug( "Couldn't get the remote device's hostname" ) 86 87 error_event= { 88 'agent': component, 89 'summary': message, 90 'device': device, 91 'eventClass': "/Cmd/Fail", 92 'component': component, 93 'severity': severity, 94 } 95 96 # At this point, we don't know what we have 97 try: 98 if hasattr_path( self, "factory.datacollector.sendEvent" ): 99 self.factory.datacollector.sendEvent( error_event ) 100 101 elif hasattr_path( self, "datacollector.sendEvent" ): 102 self.datacollector.sendEvent( error_event ) 103 104 elif hasattr_path( self, "conn.factory.datacollector.sendEvent" ): 105 self.conn.factory.datacollector.sendEvent( error_event ) 106 107 else: 108 log.debug( "Unable to send event for %s" % error_event ) 109 110 except: 111 pass # Don't cause other issues 112 113 114
115 -class SshClientError( Exception ):
116 """ 117 Exception class 118 """
119 120 121
122 -class SshClientTransport(transport.SSHClientTransport):
123 """ 124 Base client class for constructing Twisted Conch services. 125 This class is *only* responsible for connecting to the SSH 126 service on the device, and ensuring that *host* keys are sane. 127 """ 128
129 - def verifyHostKey(self, hostKey, fingerprint):
130 """ 131 Module to verify the host's SSH key against the stored fingerprint we have 132 from the last time that we communicated with the host. 133 134 NB: currently does not verify this information but simply trusts every host key 135 136 @param hostKey: host's SSH key (unused) 137 @type hostKey: string 138 @param fingerprint: host fingerprint (unused) 139 @type fingerprint: string 140 @return: Twisted deferred object 141 @rtype: Twisted deferred object (defer.succeed(1) 142 @todo: verify the host key 143 """ 144 #blowing off host key right now, should store and check 145 from Products.ZenUtils.Utils import unused 146 unused(hostKey) 147 log.debug('%s host fingerprint: %s' % (self.factory.hostname, fingerprint)) 148 return defer.succeed(1)
149 150
151 - def connectionMade(self):
152 """ 153 Called after the connection has been made. 154 Used to set up private instance variables. 155 """ 156 self.factory.transport = self.transport 157 transport.SSHClientTransport.connectionMade(self)
158 159
160 - def receiveError( self, reasonCode, description ):
161 """ 162 Called when a disconnect error message was received from the device. 163 164 @param reasonCode: error code from SSH connection failure 165 @type reasonCode: integer 166 @param description: human-readable version of the error code 167 @type description: string 168 """ 169 message= 'SSH error from remote device (code %d): %s\n' % \ 170 ( reasonCode, str( description ) ) 171 log.warn( message ) 172 sendEvent( self, message=message ) 173 transport.SSHClientTransport.receiveError(self, reasonCode, description )
174 175
176 - def receiveUnimplemented( self, seqnum ):
177 """ 178 Called when an unimplemented packet message was received from the device. 179 180 @param seqnum: SSH message code 181 @type seqnum: integer 182 """ 183 message= "Got 'unimplemented' SSH message, seqnum= %d" % seqnum 184 log.info( message ) 185 sendEvent( self, message=message ) 186 transport.SSHClientTransport.receiveUnimplemented(self, seqnum)
187 188
189 - def receiveDebug( self, alwaysDisplay, message, lang ):
190 """ 191 Called when a debug message was received from the device. 192 193 @param alwaysDisplay: boolean-type code to indicate if the message is to be displayed 194 @type alwaysDisplay: integer 195 @param message: debug message from remote device 196 @type message: string 197 @param lang: language code 198 @type lang: integer 199 """ 200 message= "Debug message from remote device (%s): %s" % ( str(lang), str(message) ) 201 log.info( message ) 202 sendEvent( self, message=message, severity=Event.Debug ) 203 204 transport.SSHClientTransport.receiveDebug(self, alwaysDisplay, message, lang )
205 206
207 - def connectionSecure(self):
208 """ 209 This is called after the connection is set up and other services can be run. 210 This function starts the SshUserAuth client (ie the Connection client). 211 """ 212 sshconn = SshConnection(self.factory) 213 sshauth = SshUserAuth(self.factory.username, sshconn, self.factory) 214 self.requestService(sshauth)
215 216 217
218 -class SshUserAuth(userauth.SSHUserAuthClient):
219 """ 220 Class to gather credentials for use with our SSH connection, 221 and use them to authenticate against the remote device. 222 """ 223
224 - def __init__(self, user, instance, factory):
225 """ 226 If no username is supplied, defaults to the user running this code (eg zenoss) 227 228 @param user: username 229 @type user: string 230 @param instance: instance object 231 @type instance: object 232 @param factory: factory info 233 @type factory: Twisted factory object 234 """ 235 user = str(user) # damn unicode 236 if user == '': 237 log.debug( "Unable to determine username/password from " + \ 238 "zCommandUser/zCommandPassword" ) 239 240 # From the Python docs about the preferred method of 241 # obtaining user name in preference to os.getlogin() 242 # (http://docs.python.org/library/os.html) 243 import pwd 244 try: 245 user = os.environ.get( 'LOGNAME', pwd.getpwuid(os.getuid())[0] ) 246 except: 247 pass 248 249 if user == '': 250 message= "No zProperties defined and unable to determine current user." 251 log.error( message ) 252 sendEvent( self, message=message ) 253 raise SshClientError( message ) 254 255 userauth.SSHUserAuthClient.__init__(self, user, instance) 256 self.user = user 257 self.factory = factory
258 259
260 - def getPassword(self, unused=None):
261 """ 262 Return a deferred object of success if there's a password or 263 return fail (ie no zCommandPassword specified) 264 265 @param unused: unused (unused) 266 @type unused: string 267 @return: Twisted deferred object (defer.succeed or defer.fail) 268 @rtype: Twisted deferred object 269 """ 270 271 if not self.factory.password: 272 message= "SshUserAuth: no password found -- " + \ 273 "has zCommandPassword been set?" 274 log.error( message ) 275 sendEvent( self, message=message ) 276 self.factory.clientFinished() 277 return defer.fail( SshClientError( message ) ) 278 279 else: 280 return defer.succeed(self.factory.password)
281 282
283 - def getPublicKey(self):
284 """ 285 Return the SSH public key (using the zProperty zKeyPath) or None 286 287 @return: SSH public key 288 @rtype: string 289 """ 290 keyPath = os.path.expanduser( self.factory.keyPath ) 291 log.debug('Expanded SSH public key path from zKeyPath %s to %s' % ( self.factory.keyPath, keyPath)) 292 293 path = None 294 if os.path.exists(keyPath): 295 path = keyPath 296 else: 297 log.debug( "SSH public key path %s doesn't exist" % keyPath ) 298 return 299 300 return keys.getPublicKeyString( path + '.pub' )
301 302
303 - def ssh_USERAUTH_FAILURE( self, packet):
304 """ 305 Called when the SSH session can't authenticate. 306 NB: This function is also called as an initializer 307 to start the connections. 308 309 @param packet: returned packet from the host 310 @type packet: object 311 """ 312 from twisted.conch.ssh.common import getNS 313 canContinue, partial = getNS(packet) 314 canContinue = canContinue.split(',') 315 316 from Products.ZenUtils.Utils import unused 317 unused(partial) 318 319 lastAuth= getattr( self, "lastAuth", '') 320 if lastAuth == '' or lastAuth == 'none': 321 pass # Start our connection 322 323 elif lastAuth == 'publickey': 324 self.authenticatedWith.append(self.lastAuth) 325 message= "SSH login to %s with SSH keys failed" % \ 326 self.factory.hostname 327 log.error( message ) 328 sendEvent( self, message=message ) 329 330 elif lastAuth == 'password': 331 message= "SSH login to %s with username %s failed" % \ 332 ( self.factory.hostname, self.user ) 333 log.error( message ) 334 sendEvent( self, message=message ) 335 336 self.factory.loginTries -= 1 337 log.debug( "Decremented loginTries count to %d" % self.factory.loginTries ) 338 339 if self.factory.loginTries <= 0: 340 message= "SSH connection aborted after maximum login attempts." 341 log.error( message ) 342 sendEvent( self, message=message ) 343 344 else: 345 return self.tryAuth('password') 346 347 348 self.authenticatedWith.append(self.lastAuth) 349 350 # Straight from the nasty Twisted code 351 def _(x, y): 352 try: 353 i1 = self.preferredOrder.index(x) 354 except ValueError: 355 return 1 356 try: 357 i2 = self.preferredOrder.index(y) 358 except ValueError: 359 return -1 360 return cmp(i1, i2)
361 362 canContinue.sort(_) 363 log.debug( 'Sorted list of authentication methods: %s' % canContinue) 364 for method in canContinue: 365 if method not in self.authenticatedWith: 366 log.debug( "Attempting method %s" % method ) 367 if self.tryAuth(method): 368 return 369 370 log.debug( "All authentication methods attempted" ) 371 self.factory.clientFinished() 372 self.transport.sendDisconnect(transport.DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE, 'No more authentication methods available')
373 374 375
376 - def getPrivateKey(self):
377 """ 378 Return a deferred with the SSH private key (using the zProperty zKeyPath) 379 380 @return: Twisted deferred object (defer.succeed) 381 @rtype: Twisted deferred object 382 """ 383 384 keyPath = os.path.expanduser(self.factory.keyPath) 385 log.debug('Expanded SSH private key path from zKeyPath %s to %s' % ( self.factory.keyPath, keyPath)) 386 path = None 387 if os.path.exists(keyPath): 388 path = keyPath 389 else: 390 log.debug( "SSH private key path %s doesn't exist" % keyPath ) 391 392 return defer.succeed(keys.getPrivateKeyObject(path, 393 passphrase=self.factory.password))
394 395 396
397 -class SshConnection(connection.SSHConnection):
398 """ 399 Wrapper class that starts channels on top of connections. 400 """ 401
402 - def __init__(self, factory):
403 """ 404 Initializer 405 406 @param factory: factory containing the connection info 407 @type factory: Twisted factory object 408 """ 409 log.debug( "Creating new SSH connection..." ) 410 connection.SSHConnection.__init__(self) 411 self.factory = factory
412 413
414 - def ssh_CHANNEL_FAILURE( self, packet):
415 """ 416 Called when the SSH session can't authenticate 417 418 @param packet: returned packet from the host 419 @type packet: object 420 """ 421 message= "CHANNEL_FAILURE: Authentication failure" 422 log.error( message ) 423 sendEvent( self, message=message ) 424 connection.SSHConnection.ssh_CHANNEL_FAILURE( self, packet )
425 426
427 - def ssh_CHANNEL_OPEN_FAILURE( self, packet):
428 """ 429 Called when the SSH session can't authenticate 430 431 @param packet: returned packet from the host 432 @type packet: object 433 """ 434 message= "CHANNEL_OPEN_FAILURE: Try lowering zSshConcurrentSessions" 435 log.error( message ) 436 sendEvent( self, message=message ) 437 connection.SSHConnection.ssh_CHANNEL_OPEN_FAILURE( self, packet )
438 439
440 - def ssh_REQUEST_FAILURE( self, packet):
441 """ 442 Called when the SSH session can't authenticate 443 444 @param packet: returned packet from the host 445 """ 446 message= "REQUEST_FAILURE: Authentication failure" 447 log.error( message ) 448 sendEvent( self, message=message ) 449 connection.SSHConnection.ssh_REQUEST_FAILURE( self, packet )
450 451
452 - def openFailed(self, reason):
453 """ 454 Called when the connection open() fails. 455 Usually this gets called after too many bad connection attempts, 456 and the remote device gets upset with us. 457 458 NB: reason.desc is the human-readable description of the failure 459 reason.code is the SSH error code 460 (see http://tools.ietf.org/html/rfc4250#section-4.2.2 for more details) 461 462 @param reason: reason object 463 @type reason: reason object 464 """ 465 466 message= 'SSH connection to %s failed (error code %d): %s' % \ 467 (self.command, reason.code, str(reason.desc) ) 468 log.error( message ) 469 sendEvent( self, message=message ) 470 connection.SSHConnection.openFailed( self, reason )
471 472
473 - def serviceStarted(self):
474 """ 475 Called when the service is active on the transport 476 """ 477 self.factory.serviceStarted(self)
478 479
480 - def addCommand(self, cmd):
481 """ 482 Open a new channel for each command in queue 483 484 @param cmd: command to run 485 @type cmd: string 486 """ 487 ch = CommandChannel(cmd, conn=self) 488 self.openChannel(ch)
489 490
491 - def channelClosed(self, channel):
492 """ 493 Called when a channel is closed. 494 REQUIRED function by Twisted. 495 496 @param channel: channel that closed 497 @type channel: Twisted channel object 498 """ 499 # grr.. patch SSH inherited method to deal with partially 500 # configured channels 501 self.localToRemoteChannel[channel.id] = None 502 self.channelsToRemoteChannel[channel] = None 503 connection.SSHConnection.channelClosed(self, channel)
504 505 506
507 -class CommandChannel(channel.SSHChannel):
508 """ 509 The class that actually interfaces between Zenoss and the device. 510 """ 511 512 name = 'session' 513
514 - def __init__(self, command, conn=None):
515 """ 516 Initializer 517 518 @param command: command to run 519 @type command: string 520 @param conn: connection to create the channel on 521 @type conn: Twisted connection object 522 """ 523 channel.SSHChannel.__init__(self, conn=conn) 524 self.command = command 525 self.exitCode = None 526 log.debug( "Started the channel for command: %s" % command )
527 528
529 - def openFailed(self, reason):
530 """ 531 Called when the open fails. 532 """ 533 from twisted.conch.error import ConchError 534 if isinstance(reason, ConchError): 535 args = (reason.data, reason.value) 536 else: 537 args = (reason.code, reason.desc) 538 message = 'Open of %s failed (error code %d): %s' % ( 539 (self.command,) + args) 540 log.warn(message) 541 sendEvent(self, message=message) 542 channel.SSHChannel.openFailed(self, reason) 543 self.conn.factory.clientFinished()
544 545
546 - def extReceived(self, dataType, data ):
547 """ 548 Called when we receive extended data (usually standard error) 549 550 @param dataType: data type code 551 @type dataType: integer 552 """ 553 message= 'The command %s returned stderr data (%d) from the device: %s' \ 554 % (self.command, dataType, data) 555 log.warn( message ) 556 sendEvent( self, message=message )
557 558
559 - def channelOpen(self, unused):
560 """ 561 Initialize the channel and send our command to the device. 562 563 @param unused: unused (unused) 564 @type unused: string 565 @return: Twisted channel 566 @rtype: Twisted channel 567 """ 568 569 log.debug('Opening command channel for %s' % self.command) 570 self.data = '' 571 572 # Notes for sendRequest: 573 # 'exec' - execute the following command and exit 574 # common.NS() - encodes the command as a length-prefixed string 575 # wantReply - reply to let us know the process has been started 576 d = self.conn.sendRequest(self, 'exec', common.NS(self.command), 577 wantReply=1) 578 return d
579 580
581 - def request_exit_status(self, data):
582 """ 583 Gathers the exit code from the device 584 585 @param data: returned value from device 586 @type data: packet 587 """ 588 import struct 589 self.exitCode = struct.unpack('>L', data)[0] 590 log.debug("Exit code for %s is %d: %s", 591 self.command, 592 self.exitCode, 593 getExitMessage(self.exitCode))
594 595
596 - def dataReceived(self, data):
597 """ 598 Response stream from the device. Can be called multiple times. 599 600 @param data: returned value from device 601 @type data: string 602 """ 603 self.data += data
604 605
606 - def closed(self):
607 """ 608 Cleanup for the channel, as both ends have closed the channel. 609 """ 610 log.debug('Closed command channel for command %s with data: %s' % \ 611 (self.command, repr(self.data))) 612 self.conn.factory.addResult(self.command, self.data, self.exitCode) 613 self.loseConnection() 614 615 self.conn.factory.channelClosed()
616 617 618
619 -class SshClient(CollectorClient.CollectorClient):
620 """ 621 SSH Collector class to connect to a particular device 622 """ 623
624 - def __init__(self, hostname, ip, port=22, plugins=[], options=None, 625 device=None, datacollector=None ):
626 """ 627 Initializer 628 629 @param hostname: hostname of the device 630 @type hostname: string 631 @param ip: IP address of the device 632 @type ip: string 633 @param port: port number to use to connect to device 634 @type port: integer 635 @param plugins: plugins 636 @type plugins: list of plugins 637 @param options: options 638 @type options: list 639 @param device: name of device 640 @type device: string 641 @param datacollector: object 642 @type datacollector: object 643 """ 644 645 CollectorClient.CollectorClient.__init__(self, hostname, ip, port, 646 plugins, options, device, datacollector) 647 self.hostname = hostname 648 self.protocol = SshClientTransport 649 self.connection = None 650 self.transport = None 651 self.openSessions = 0 652 self.workList = list(self.getCommands())
653 654
655 - def run(self):
656 """ 657 Start SSH collection. 658 """ 659 reactor.connectTCP(self.ip, self.port, self, self.loginTimeout)
660 661
662 - def runCommands(self):
663 availSessions = (self.concurrentSessions - 1) - self.openSessions 664 for i in range(min(len(self.workList), availSessions)): 665 cmd = self.workList.pop(0) 666 self.openSessions += 1 667 self.connection.addCommand(cmd)
668 669
670 - def channelClosed(self):
671 self.openSessions -= 1 672 if self.commandsFinished(): 673 self.clientFinished() 674 return 675 676 if self.workList: 677 cmd = self.workList.pop(0) 678 self.openSessions += 1 679 self.connection.addCommand(cmd)
680 681
682 - def serviceStarted(self, sshconn):
683 """ 684 Run commands that are in the command queue 685 686 @param sshconn: connection to create channels on 687 @type sshconn: Twisted SSH connection 688 """ 689 690 log.info("Connected to device %s" % self.hostname) 691 self.connection = sshconn 692 self.runCommands()
693 694
695 - def addCommand(self, commands):
696 """ 697 Add a command or commands to queue and open a command 698 channel for each command 699 700 @param commands: commands to run 701 @type commands: list 702 """ 703 704 CollectorClient.CollectorClient.addCommand(self, commands) 705 if type(commands) == type(''): 706 commands = (commands,) 707 self.workList.extend(commands) 708 709 # This code is required when we're reused by zencommand. 710 if self.connection: 711 self.runCommands()
712 713
714 - def clientConnectionFailed( self, connector, reason ):
715 """ 716 If we didn't connect let the modeler know 717 718 @param connector: connector associated with this failure 719 @type connector: object 720 @param reason: failure object 721 @type reason: object 722 """ 723 from Products.ZenUtils.Utils import unused 724 unused(connector) 725 message= reason.getErrorMessage() 726 log.error( message ) 727 sendEvent( self, device=self.hostname, message=message ) 728 self.clientFinished()
729 730
731 - def loseConnection(self):
732 """ 733 Called when the connection gets closed. 734 """ 735 log.debug( "Connection closed" )
736 #self.connection.loseConnection() 737 738 739
740 -def main():
741 """ 742 Test harness main() 743 744 Usage: 745 746 python SshClient.py hostname[:port] comand [command] 747 748 Each command must be enclosed in quotes (") to be interpreted 749 properly as a complete unit. 750 """ 751 import socket 752 from itertools import chain 753 import pprint 754 755 logging.basicConfig() 756 757 parser = CollectorClient.buildOptions() 758 options = CollectorClient.parseOptions(parser,22) 759 log.setLevel(options.logseverity) 760 761 client = SshClient(options.hostname, 762 socket.gethostbyname(options.hostname), 763 options.port, 764 options=options) 765 766 # Rather than getting info from zenhub, just pass our 767 # commands in 768 client.getCommands= lambda: chain( options.commands ) 769 770 client.run() 771 772 client.clientFinished= reactor.stop 773 client._commands.append( options.commands ) 774 reactor.run() 775 776 pprint.pprint(client.getResults())
777 778 779 if __name__ == '__main__': 780 main() 781