Package DataCollector :: Module SshClient
[hide private]
[frames] | no frames]

Source Code for Module DataCollector.SshClient

  1  ########################################################################### 
  2  # 
  3  # This program is part of Zenoss Core, an open source monitoring platform. 
  4  # Copyright (C) 2007, Zenoss Inc. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify it 
  7  # under the terms of the GNU General Public License version 2 as published by 
  8  # the Free Software Foundation. 
  9  # 
 10  # For complete information please visit: http://www.zenoss.com/oss/ 
 11  # 
 12  ########################################################################### 
 13   
 14  __doc__="""SshClient runs commands on a remote box using SSH and 
 15  returns their results. 
 16   
 17  See http://twistedmatrix.com/trac/wiki/Documentation for Twisted documentation, 
 18  specifically documentation on 'conch' (Twisted's SSH protocol support). 
 19  """ 
 20   
 21  import os 
 22  import sys 
 23  import logging 
 24  log = logging.getLogger("zen.SshClient") 
 25   
 26  import Globals 
 27   
 28  from twisted.conch.ssh import transport, userauth, connection 
 29  from twisted.conch.ssh import common, channel 
 30  from twisted.conch.ssh.keys import Key 
 31  from twisted.internet import defer, reactor 
 32  from Products.ZenEvents import Event 
 33  from Products.ZenUtils.Utils import getExitMessage 
 34   
 35  from Exceptions import * 
 36   
 37  import CollectorClient 
 38   
 39  # NB: Most messages returned back from Twisted are Unicode. 
 40  #     Expect to use str() to convert to ASCII before dumping out. :) 
 41   
 42   
43 -def sendEvent( self, message="", device='', severity=Event.Error ):
44 """ 45 Shortcut version of sendEvent() 46 47 @param message: message to send in Zenoss event 48 @type message: string 49 @param device: hostname of device to which this event is associated 50 @type device: string 51 @param severity: Zenoss severity from Products.ZenEvents 52 @type severity: integer 53 """ 54 55 # Parse out the daemon's name 56 component= os.path.basename( sys.argv[0] ).replace( '.py', '' ) 57 58 def hasattr_path( object_root, path ): 59 """ 60 The regular hasattr() only works on one component, 61 not multiples. 62 63 @param object_root: object to start searching for path 64 @type object_root: object 65 @param path: path to func or variable (eg "conn.factory" ) 66 @type path: string 67 @return: is object_root.path sane? 68 @rtype: boolean 69 """ 70 obj = object_root 71 for chunk in path.split('.'): 72 obj= getattr( obj, chunk, None ) 73 if obj is None: 74 return False 75 return True
76 77 # ... and the device's name (as known by Zenoss) 78 if device == '': 79 if hasattr_path( self, "factory.hostname" ): 80 device= self.factory.hostname 81 82 elif hasattr_path( self, "conn.factory.hostname" ): 83 device= self.conn.factory.hostname 84 85 else: 86 log.debug( "Couldn't get the remote device's hostname" ) 87 88 error_event= { 89 'agent': component, 90 'summary': message, 91 'device': device, 92 'eventClass': "/Cmd/Fail", 93 'component': component, 94 'severity': severity, 95 } 96 97 # At this point, we don't know what we have 98 try: 99 if hasattr_path( self, "factory.datacollector.sendEvent" ): 100 self.factory.datacollector.sendEvent( error_event ) 101 102 elif hasattr_path( self, "datacollector.sendEvent" ): 103 self.datacollector.sendEvent( error_event ) 104 105 elif hasattr_path( self, "conn.factory.datacollector.sendEvent" ): 106 self.conn.factory.datacollector.sendEvent( error_event ) 107 108 else: 109 log.debug( "Unable to send event for %s" % error_event ) 110 111 except: 112 pass # Don't cause other issues 113 114 115
116 -class SshClientError( Exception ):
117 """ 118 Exception class 119 """
120 121 122
123 -class SshClientTransport(transport.SSHClientTransport):
124 """ 125 Base client class for constructing Twisted Conch services. 126 This class is *only* responsible for connecting to the SSH 127 service on the device, and ensuring that *host* keys are sane. 128 """ 129
130 - def verifyHostKey(self, hostKey, fingerprint):
131 """ 132 Module to verify the host's SSH key against the stored fingerprint we have 133 from the last time that we communicated with the host. 134 135 NB: currently does not verify this information but simply trusts every host key 136 137 @param hostKey: host's SSH key (unused) 138 @type hostKey: string 139 @param fingerprint: host fingerprint (unused) 140 @type fingerprint: string 141 @return: Twisted deferred object 142 @rtype: Twisted deferred object (defer.succeed(1) 143 @todo: verify the host key 144 """ 145 #blowing off host key right now, should store and check 146 from Products.ZenUtils.Utils import unused 147 unused(hostKey) 148 log.debug('%s host fingerprint: %s' % (self.factory.hostname, fingerprint)) 149 return defer.succeed(1)
150 151
152 - def connectionMade(self):
153 """ 154 Called after the connection has been made. 155 Used to set up private instance variables. 156 """ 157 self.factory.transport = self.transport 158 transport.SSHClientTransport.connectionMade(self)
159 160
161 - def receiveError( self, reasonCode, description ):
162 """ 163 Called when a disconnect error message was received from the device. 164 165 @param reasonCode: error code from SSH connection failure 166 @type reasonCode: integer 167 @param description: human-readable version of the error code 168 @type description: string 169 """ 170 message= 'SSH error from remote device (code %d): %s\n' % \ 171 ( reasonCode, str( description ) ) 172 log.warn( message ) 173 sendEvent( self, message=message ) 174 transport.SSHClientTransport.receiveError(self, reasonCode, description )
175 176
177 - def receiveUnimplemented( self, seqnum ):
178 """ 179 Called when an unimplemented packet message was received from the device. 180 181 @param seqnum: SSH message code 182 @type seqnum: integer 183 """ 184 message= "Got 'unimplemented' SSH message, seqnum= %d" % seqnum 185 log.info( message ) 186 sendEvent( self, message=message ) 187 transport.SSHClientTransport.receiveUnimplemented(self, seqnum)
188 189
190 - def receiveDebug( self, alwaysDisplay, message, lang ):
191 """ 192 Called when a debug message was received from the device. 193 194 @param alwaysDisplay: boolean-type code to indicate if the message is to be displayed 195 @type alwaysDisplay: integer 196 @param message: debug message from remote device 197 @type message: string 198 @param lang: language code 199 @type lang: integer 200 """ 201 message= "Debug message from remote device (%s): %s" % ( str(lang), str(message) ) 202 log.info( message ) 203 sendEvent( self, message=message, severity=Event.Debug ) 204 205 transport.SSHClientTransport.receiveDebug(self, alwaysDisplay, message, lang )
206 207
208 - def connectionSecure(self):
209 """ 210 This is called after the connection is set up and other services can be run. 211 This function starts the SshUserAuth client (ie the Connection client). 212 """ 213 sshconn = SshConnection(self.factory) 214 sshauth = SshUserAuth(self.factory.username, sshconn, self.factory) 215 self.requestService(sshauth)
216 217 218
219 -class SshUserAuth(userauth.SSHUserAuthClient):
220 """ 221 Class to gather credentials for use with our SSH connection, 222 and use them to authenticate against the remote device. 223 """ 224
225 - def __init__(self, user, instance, factory):
226 """ 227 If no username is supplied, defaults to the user running this code (eg zenoss) 228 229 @param user: username 230 @type user: string 231 @param instance: instance object 232 @type instance: object 233 @param factory: factory info 234 @type factory: Twisted factory object 235 """ 236 user = str(user) # damn unicode 237 if user == '': 238 log.debug( "Unable to determine username/password from " + \ 239 "zCommandUser/zCommandPassword" ) 240 241 # From the Python docs about the preferred method of 242 # obtaining user name in preference to os.getlogin() 243 # (http://docs.python.org/library/os.html) 244 import pwd 245 try: 246 user = os.environ.get( 'LOGNAME', pwd.getpwuid(os.getuid())[0] ) 247 except: 248 pass 249 250 if user == '': 251 message= "No zProperties defined and unable to determine current user." 252 log.error( message ) 253 sendEvent( self, message=message ) 254 raise SshClientError( message ) 255 256 userauth.SSHUserAuthClient.__init__(self, user, instance) 257 self.user = user 258 self.factory = factory 259 self._key = self._getKey()
260 261
262 - def getPassword(self, unused=None):
263 """ 264 Return a deferred object of success if there's a password or 265 return fail (ie no zCommandPassword specified) 266 267 @param unused: unused (unused) 268 @type unused: string 269 @return: Twisted deferred object (defer.succeed or defer.fail) 270 @rtype: Twisted deferred object 271 """ 272 273 if not self.factory.password: 274 message= "SshUserAuth: no password found -- " + \ 275 "has zCommandPassword been set?" 276 log.error( message ) 277 sendEvent( self, message=message ) 278 self.factory.clientFinished() 279 return defer.fail( SshClientError( message ) ) 280 281 else: 282 return defer.succeed(self.factory.password)
283
284 - def _getKey(self):
285 keyPath = os.path.expanduser(self.factory.keyPath) 286 log.debug('Expanded SSH key path from zKeyPath %s to %s' % ( 287 self.factory.keyPath, keyPath)) 288 if os.path.exists(keyPath): 289 data = ''.join(open(keyPath).readlines()).strip() 290 key = Key.fromString(data, 291 passphrase=self.factory.password) 292 else: 293 key = None 294 log.debug( "SSH key path %s doesn't exist" % keyPath ) 295 return key
296
297 - def getPublicKey(self):
298 """ 299 Return the SSH public key (using the zProperty zKeyPath) or None 300 301 @return: SSH public key 302 @rtype: string 303 """ 304 if self._key is not None: 305 return self._key.blob()
306
307 - def getPrivateKey(self):
308 """ 309 Return a deferred with the SSH private key (using the zProperty zKeyPath) 310 311 @return: Twisted deferred object (defer.succeed) 312 @rtype: Twisted deferred object 313 """ 314 if self._key is None: 315 keyObject = None 316 else: 317 keyObject = self._key.keyObject 318 return defer.succeed(keyObject)
319
320 - def ssh_USERAUTH_FAILURE( self, packet):
321 """ 322 Called when the SSH session can't authenticate. 323 NB: This function is also called as an initializer 324 to start the connections. 325 326 @param packet: returned packet from the host 327 @type packet: object 328 """ 329 from twisted.conch.ssh.common import getNS 330 canContinue, partial = getNS(packet) 331 canContinue = canContinue.split(',') 332 333 from Products.ZenUtils.Utils import unused 334 unused(partial) 335 336 lastAuth= getattr( self, "lastAuth", '') 337 if lastAuth == '' or lastAuth == 'none': 338 pass # Start our connection 339 340 elif lastAuth == 'publickey': 341 self.authenticatedWith.append(self.lastAuth) 342 message= "SSH login to %s with SSH keys failed" % \ 343 self.factory.hostname 344 log.error( message ) 345 sendEvent( self, message=message ) 346 347 elif lastAuth == 'password': 348 message= "SSH login to %s with username %s failed" % \ 349 ( self.factory.hostname, self.user ) 350 log.error( message ) 351 sendEvent( self, message=message ) 352 353 self.factory.loginTries -= 1 354 log.debug( "Decremented loginTries count to %d" % self.factory.loginTries ) 355 356 if self.factory.loginTries <= 0: 357 message= "SSH connection aborted after maximum login attempts." 358 log.error( message ) 359 sendEvent( self, message=message ) 360 361 else: 362 return self.tryAuth('password') 363 364 365 self.authenticatedWith.append(self.lastAuth) 366 367 # Straight from the nasty Twisted code 368 def _(x, y): 369 try: 370 i1 = self.preferredOrder.index(x) 371 except ValueError: 372 return 1 373 try: 374 i2 = self.preferredOrder.index(y) 375 except ValueError: 376 return -1 377 return cmp(i1, i2)
378 379 canContinue.sort(_) 380 log.debug( 'Sorted list of authentication methods: %s' % canContinue) 381 for method in canContinue: 382 if method not in self.authenticatedWith: 383 log.debug( "Attempting method %s" % method ) 384 if self.tryAuth(method): 385 return 386 387 log.debug( "All authentication methods attempted" ) 388 self.factory.clientFinished() 389 self.transport.sendDisconnect(transport.DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE, 'No more authentication methods available')
390
391 -class SshConnection(connection.SSHConnection):
392 """ 393 Wrapper class that starts channels on top of connections. 394 """ 395
396 - def __init__(self, factory):
397 """ 398 Initializer 399 400 @param factory: factory containing the connection info 401 @type factory: Twisted factory object 402 """ 403 log.debug( "Creating new SSH connection..." ) 404 connection.SSHConnection.__init__(self) 405 self.factory = factory
406 407
408 - def ssh_CHANNEL_FAILURE( self, packet):
409 """ 410 Called when the SSH session can't authenticate 411 412 @param packet: returned packet from the host 413 @type packet: object 414 """ 415 message= "CHANNEL_FAILURE: Authentication failure" 416 log.error( message ) 417 sendEvent( self, message=message ) 418 connection.SSHConnection.ssh_CHANNEL_FAILURE( self, packet )
419 420
421 - def ssh_CHANNEL_OPEN_FAILURE( self, packet):
422 """ 423 Called when the SSH session can't authenticate 424 425 @param packet: returned packet from the host 426 @type packet: object 427 """ 428 message= "CHANNEL_OPEN_FAILURE: Try lowering zSshConcurrentSessions" 429 log.error( message ) 430 sendEvent( self, message=message ) 431 connection.SSHConnection.ssh_CHANNEL_OPEN_FAILURE( self, packet )
432 433
434 - def ssh_REQUEST_FAILURE( self, packet):
435 """ 436 Called when the SSH session can't authenticate 437 438 @param packet: returned packet from the host 439 """ 440 message= "REQUEST_FAILURE: Authentication failure" 441 log.error( message ) 442 sendEvent( self, message=message ) 443 connection.SSHConnection.ssh_REQUEST_FAILURE( self, packet )
444 445
446 - def openFailed(self, reason):
447 """ 448 Called when the connection open() fails. 449 Usually this gets called after too many bad connection attempts, 450 and the remote device gets upset with us. 451 452 NB: reason.desc is the human-readable description of the failure 453 reason.code is the SSH error code 454 (see http://tools.ietf.org/html/rfc4250#section-4.2.2 for more details) 455 456 @param reason: reason object 457 @type reason: reason object 458 """ 459 460 message= 'SSH connection to %s failed (error code %d): %s' % \ 461 (self.command, reason.code, str(reason.desc) ) 462 log.error( message ) 463 sendEvent( self, message=message ) 464 connection.SSHConnection.openFailed( self, reason )
465 466
467 - def serviceStarted(self):
468 """ 469 Called when the service is active on the transport 470 """ 471 self.factory.serviceStarted(self)
472 473
474 - def addCommand(self, cmd):
475 """ 476 Open a new channel for each command in queue 477 478 @param cmd: command to run 479 @type cmd: string 480 """ 481 ch = CommandChannel(cmd, conn=self) 482 self.openChannel(ch)
483 484
485 - def channelClosed(self, channel):
486 """ 487 Called when a channel is closed. 488 REQUIRED function by Twisted. 489 490 @param channel: channel that closed 491 @type channel: Twisted channel object 492 """ 493 # grr.. patch SSH inherited method to deal with partially 494 # configured channels 495 self.localToRemoteChannel[channel.id] = None 496 self.channelsToRemoteChannel[channel] = None 497 connection.SSHConnection.channelClosed(self, channel)
498 499 500
501 -class CommandChannel(channel.SSHChannel):
502 """ 503 The class that actually interfaces between Zenoss and the device. 504 """ 505 506 name = 'session' 507 conn = None 508
509 - def __init__(self, command, conn=None):
510 """ 511 Initializer 512 513 @param command: command to run 514 @type command: string 515 @param conn: connection to create the channel on 516 @type conn: Twisted connection object 517 """ 518 channel.SSHChannel.__init__(self, conn=conn) 519 self.command = command 520 self.exitCode = None 521 log.debug( "Started the channel for command: %s" % command )
522 523
524 - def openFailed(self, reason):
525 """ 526 Called when the open fails. 527 """ 528 from twisted.conch.error import ConchError 529 if isinstance(reason, ConchError): 530 args = (reason.data, reason.value) 531 else: 532 args = (reason.code, reason.desc) 533 message = 'Open of %s failed (error code %d): %s' % ( 534 (self.command,) + args) 535 log.warn(message) 536 sendEvent(self, message=message) 537 channel.SSHChannel.openFailed(self, reason) 538 if self.conn is not None: 539 self.conn.factory.clientFinished()
540 541
542 - def extReceived(self, dataType, data ):
543 """ 544 Called when we receive extended data (usually standard error) 545 546 @param dataType: data type code 547 @type dataType: integer 548 """ 549 message= 'The command %s returned stderr data (%d) from the device: %s' \ 550 % (self.command, dataType, data) 551 log.warn( message ) 552 sendEvent( self, message=message )
553 554
555 - def channelOpen(self, unused):
556 """ 557 Initialize the channel and send our command to the device. 558 559 @param unused: unused (unused) 560 @type unused: string 561 @return: Twisted channel 562 @rtype: Twisted channel 563 """ 564 565 log.debug('Opening command channel for %s' % self.command) 566 self.data = '' 567 568 # Notes for sendRequest: 569 # 'exec' - execute the following command and exit 570 # common.NS() - encodes the command as a length-prefixed string 571 # wantReply - reply to let us know the process has been started 572 d = self.conn.sendRequest(self, 'exec', common.NS(self.command), 573 wantReply=1) 574 return d
575 576
577 - def request_exit_status(self, data):
578 """ 579 Gathers the exit code from the device 580 581 @param data: returned value from device 582 @type data: packet 583 """ 584 import struct 585 self.exitCode = struct.unpack('>L', data)[0] 586 log.debug("Exit code for %s is %d: %s", 587 self.command, 588 self.exitCode, 589 getExitMessage(self.exitCode))
590 591
592 - def dataReceived(self, data):
593 """ 594 Response stream from the device. Can be called multiple times. 595 596 @param data: returned value from device 597 @type data: string 598 """ 599 self.data += data
600 601
602 - def closed(self):
603 """ 604 Cleanup for the channel, as both ends have closed the channel. 605 """ 606 log.debug('Closed command channel for command %s with data: %s' % \ 607 (self.command, repr(self.data))) 608 self.conn.factory.addResult(self.command, self.data, self.exitCode) 609 self.loseConnection() 610 611 self.conn.factory.channelClosed()
612 613 614
615 -class SshClient(CollectorClient.CollectorClient):
616 """ 617 SSH Collector class to connect to a particular device 618 """ 619
620 - def __init__(self, hostname, ip, port=22, plugins=[], options=None, 621 device=None, datacollector=None, isLoseConnection=False):
622 """ 623 Initializer 624 625 @param hostname: hostname of the device 626 @type hostname: string 627 @param ip: IP address of the device 628 @type ip: string 629 @param port: port number to use to connect to device 630 @type port: integer 631 @param plugins: plugins 632 @type plugins: list of plugins 633 @param options: options 634 @type options: list 635 @param device: name of device 636 @type device: string 637 @param datacollector: object 638 @type datacollector: object 639 """ 640 641 CollectorClient.CollectorClient.__init__(self, hostname, ip, port, 642 plugins, options, device, datacollector) 643 self.hostname = hostname 644 self.protocol = SshClientTransport 645 self.connection = None 646 self.transport = None 647 self.openSessions = 0 648 self.workList = list(self.getCommands()) 649 self.isLoseConnection = isLoseConnection
650
651 - def run(self):
652 """ 653 Start SSH collection. 654 """ 655 reactor.connectTCP(self.ip, self.port, self, self.loginTimeout)
656 657
658 - def runCommands(self):
659 availSessions = (self.concurrentSessions - 1) - self.openSessions 660 for i in range(min(len(self.workList), availSessions)): 661 cmd = self.workList.pop(0) 662 self.openSessions += 1 663 self.connection.addCommand(cmd)
664 665
666 - def channelClosed(self):
667 self.openSessions -= 1 668 if self.commandsFinished(): 669 if self.isLoseConnection: 670 self.transport.loseConnection() 671 self.clientFinished() 672 return 673 674 if self.workList: 675 cmd = self.workList.pop(0) 676 self.openSessions += 1 677 self.connection.addCommand(cmd)
678 679
680 - def serviceStarted(self, sshconn):
681 """ 682 Run commands that are in the command queue 683 684 @param sshconn: connection to create channels on 685 @type sshconn: Twisted SSH connection 686 """ 687 688 log.info("Connected to device %s" % self.hostname) 689 self.connection = sshconn 690 self.runCommands()
691 692
693 - def addCommand(self, commands):
694 """ 695 Add a command or commands to queue and open a command 696 channel for each command 697 698 @param commands: commands to run 699 @type commands: list 700 """ 701 702 CollectorClient.CollectorClient.addCommand(self, commands) 703 if type(commands) == type(''): 704 commands = (commands,) 705 self.workList.extend(commands) 706 707 # This code is required when we're reused by zencommand. 708 if self.connection: 709 self.runCommands()
710 711
712 - def clientConnectionFailed( self, connector, reason ):
713 """ 714 If we didn't connect let the modeler know 715 716 @param connector: connector associated with this failure 717 @type connector: object 718 @param reason: failure object 719 @type reason: object 720 """ 721 from Products.ZenUtils.Utils import unused 722 unused(connector) 723 message= reason.getErrorMessage() 724 log.error( message ) 725 sendEvent( self, device=self.hostname, message=message ) 726 self.clientFinished()
727 728
729 - def loseConnection(self):
730 """ 731 Called when the connection gets closed. 732 """ 733 log.debug( "Connection closed" )
734 #self.connection.loseConnection() 735 736 737
738 -def main():
739 """ 740 Test harness main() 741 742 Usage: 743 744 python SshClient.py hostname[:port] comand [command] 745 746 Each command must be enclosed in quotes (") to be interpreted 747 properly as a complete unit. 748 """ 749 import socket 750 from itertools import chain 751 import pprint 752 753 logging.basicConfig() 754 755 parser = CollectorClient.buildOptions() 756 options = CollectorClient.parseOptions(parser,22) 757 log.setLevel(options.logseverity) 758 759 client = SshClient(options.hostname, 760 socket.gethostbyname(options.hostname), 761 options.port, 762 options=options) 763 764 # Rather than getting info from zenhub, just pass our 765 # commands in 766 client.getCommands= lambda: chain( options.commands ) 767 768 client.run() 769 770 client.clientFinished= reactor.stop 771 client._commands.append( options.commands ) 772 reactor.run() 773 774 pprint.pprint(client.getResults())
775 776 777 if __name__ == '__main__': 778 main() 779