You've already forked linux-packaging-mono
							
							
		
			
	
	
		
			368 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			368 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | """This class extends pexpect.spawn to specialize setting up SSH connections.
 | ||
|  | This adds methods for login, logout, and expecting the shell prompt. | ||
|  | 
 | ||
|  | $Id: pxssh.py 513 2008-02-09 18:26:13Z noah $ | ||
|  | """
 | ||
|  | 
 | ||
|  | from pexpect import * | ||
|  | import pexpect | ||
|  | import time | ||
|  | 
 | ||
|  | __all__ = ['ExceptionPxssh', 'pxssh'] | ||
|  | 
 | ||
|  | # Exception classes used by this module. | ||
|  | 
 | ||
|  | 
 | ||
|  | class ExceptionPxssh(ExceptionPexpect): | ||
|  |     """Raised for pxssh exceptions.
 | ||
|  |     """
 | ||
|  | 
 | ||
|  | 
 | ||
|  | class pxssh (spawn): | ||
|  | 
 | ||
|  |     """This class extends pexpect.spawn to specialize setting up SSH
 | ||
|  |     connections. This adds methods for login, logout, and expecting the shell | ||
|  |     prompt. It does various tricky things to handle many situations in the SSH | ||
|  |     login process. For example, if the session is your first login, then pxssh | ||
|  |     automatically accepts the remote certificate; or if you have public key | ||
|  |     authentication setup then pxssh won't wait for the password prompt. | ||
|  | 
 | ||
|  |     pxssh uses the shell prompt to synchronize output from the remote host. In | ||
|  |     order to make this more robust it sets the shell prompt to something more | ||
|  |     unique than just $ or #. This should work on most Borne/Bash or Csh style | ||
|  |     shells. | ||
|  | 
 | ||
|  |     Example that runs a few commands on a remote server and prints the result:: | ||
|  | 
 | ||
|  |         import pxssh | ||
|  |         import getpass | ||
|  |         try: | ||
|  |             s = pxssh.pxssh() | ||
|  |             hostname = raw_input('hostname: ') | ||
|  |             username = raw_input('username: ') | ||
|  |             password = getpass.getpass('password: ') | ||
|  |             s.login (hostname, username, password) | ||
|  |             s.sendline ('uptime')  # run a command | ||
|  |             s.prompt()             # match the prompt | ||
|  |             print s.before         # print everything before the prompt. | ||
|  |             s.sendline ('ls -l') | ||
|  |             s.prompt() | ||
|  |             print s.before | ||
|  |             s.sendline ('df') | ||
|  |             s.prompt() | ||
|  |             print s.before | ||
|  |             s.logout() | ||
|  |         except pxssh.ExceptionPxssh, e: | ||
|  |             print "pxssh failed on login." | ||
|  |             print str(e) | ||
|  | 
 | ||
|  |     Note that if you have ssh-agent running while doing development with pxssh | ||
|  |     then this can lead to a lot of confusion. Many X display managers (xdm, | ||
|  |     gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI | ||
|  |     dialog box popup asking for a password during development. You should turn | ||
|  |     off any key agents during testing. The 'force_password' attribute will turn | ||
|  |     off public key authentication. This will only work if the remote SSH server | ||
|  |     is configured to allow password logins. Example of using 'force_password' | ||
|  |     attribute:: | ||
|  | 
 | ||
|  |             s = pxssh.pxssh() | ||
|  |             s.force_password = True | ||
|  |             hostname = raw_input('hostname: ') | ||
|  |             username = raw_input('username: ') | ||
|  |             password = getpass.getpass('password: ') | ||
|  |             s.login (hostname, username, password) | ||
|  |     """
 | ||
|  | 
 | ||
|  |     def __init__( | ||
|  |             self, | ||
|  |             timeout=30, | ||
|  |             maxread=2000, | ||
|  |             searchwindowsize=None, | ||
|  |             logfile=None, | ||
|  |             cwd=None, | ||
|  |             env=None): | ||
|  |         spawn.__init__( | ||
|  |             self, | ||
|  |             None, | ||
|  |             timeout=timeout, | ||
|  |             maxread=maxread, | ||
|  |             searchwindowsize=searchwindowsize, | ||
|  |             logfile=logfile, | ||
|  |             cwd=cwd, | ||
|  |             env=env) | ||
|  | 
 | ||
|  |         self.name = '<pxssh>' | ||
|  | 
 | ||
|  |         # SUBTLE HACK ALERT! Note that the command to set the prompt uses a | ||
|  |         # slightly different string than the regular expression to match it. This | ||
|  |         # is because when you set the prompt the command will echo back, but we | ||
|  |         # don't want to match the echoed command. So if we make the set command | ||
|  |         # slightly different than the regex we eliminate the problem. To make the | ||
|  |         # set command different we add a backslash in front of $. The $ doesn't | ||
|  |         # need to be escaped, but it doesn't hurt and serves to make the set | ||
|  |         # prompt command different than the regex. | ||
|  | 
 | ||
|  |         # used to match the command-line prompt | ||
|  |         self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " | ||
|  |         self.PROMPT = self.UNIQUE_PROMPT | ||
|  | 
 | ||
|  |         # used to set shell command-line prompt to UNIQUE_PROMPT. | ||
|  |         self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" | ||
|  |         self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" | ||
|  |         self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" | ||
|  |         # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from | ||
|  |         # displaying a GUI password dialog. I have not figured out how to | ||
|  |         # disable only SSH_ASKPASS without also disabling X11 forwarding. | ||
|  |         # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! | ||
|  |         #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" | ||
|  |         self.force_password = False | ||
|  |         self.auto_prompt_reset = True | ||
|  | 
 | ||
|  |     def levenshtein_distance(self, a, b): | ||
|  |         """This calculates the Levenshtein distance between a and b.
 | ||
|  |         """
 | ||
|  | 
 | ||
|  |         n, m = len(a), len(b) | ||
|  |         if n > m: | ||
|  |             a, b = b, a | ||
|  |             n, m = m, n | ||
|  |         current = range(n + 1) | ||
|  |         for i in range(1, m + 1): | ||
|  |             previous, current = current, [i] + [0] * n | ||
|  |             for j in range(1, n + 1): | ||
|  |                 add, delete = previous[j] + 1, current[j - 1] + 1 | ||
|  |                 change = previous[j - 1] | ||
|  |                 if a[j - 1] != b[i - 1]: | ||
|  |                     change = change + 1 | ||
|  |                 current[j] = min(add, delete, change) | ||
|  |         return current[n] | ||
|  | 
 | ||
|  |     def sync_original_prompt(self): | ||
|  |         """This attempts to find the prompt. Basically, press enter and record
 | ||
|  |         the response; press enter again and record the response; if the two | ||
|  |         responses are similar then assume we are at the original prompt. This | ||
|  |         is a slow function. It can take over 10 seconds. """
 | ||
|  | 
 | ||
|  |         # All of these timing pace values are magic. | ||
|  |         # I came up with these based on what seemed reliable for | ||
|  |         # connecting to a heavily loaded machine I have. | ||
|  |         # If latency is worse than these values then this will fail. | ||
|  | 
 | ||
|  |         try: | ||
|  |             # GAS: Clear out the cache before getting the prompt | ||
|  |             self.read_nonblocking(size=10000, timeout=1) | ||
|  |         except TIMEOUT: | ||
|  |             pass | ||
|  |         time.sleep(0.1) | ||
|  |         self.sendline() | ||
|  |         time.sleep(0.5) | ||
|  |         x = self.read_nonblocking(size=1000, timeout=1) | ||
|  |         time.sleep(0.1) | ||
|  |         self.sendline() | ||
|  |         time.sleep(0.5) | ||
|  |         a = self.read_nonblocking(size=1000, timeout=1) | ||
|  |         time.sleep(0.1) | ||
|  |         self.sendline() | ||
|  |         time.sleep(0.5) | ||
|  |         b = self.read_nonblocking(size=1000, timeout=1) | ||
|  |         ld = self.levenshtein_distance(a, b) | ||
|  |         len_a = len(a) | ||
|  |         if len_a == 0: | ||
|  |             return False | ||
|  |         if float(ld) / len_a < 0.4: | ||
|  |             return True | ||
|  |         return False | ||
|  | 
 | ||
|  |     # TODO: This is getting messy and I'm pretty sure this isn't perfect. | ||
|  |     # TODO: I need to draw a flow chart for this. | ||
|  |     def login( | ||
|  |             self, | ||
|  |             server, | ||
|  |             username, | ||
|  |             password='', | ||
|  |             terminal_type='ansi', | ||
|  |             original_prompt=r"[#$]", | ||
|  |             login_timeout=10, | ||
|  |             port=None, | ||
|  |             auto_prompt_reset=True): | ||
|  |         """This logs the user into the given server. It uses the
 | ||
|  |         'original_prompt' to try to find the prompt right after login. When it | ||
|  |         finds the prompt it immediately tries to reset the prompt to something | ||
|  |         more easily matched. The default 'original_prompt' is very optimistic | ||
|  |         and is easily fooled. It's more reliable to try to match the original | ||
|  |         prompt as exactly as possible to prevent false matches by server | ||
|  |         strings such as the "Message Of The Day". On many systems you can | ||
|  |         disable the MOTD on the remote server by creating a zero-length file | ||
|  |         called "~/.hushlogin" on the remote server. If a prompt cannot be found | ||
|  |         then this will not necessarily cause the login to fail. In the case of | ||
|  |         a timeout when looking for the prompt we assume that the original | ||
|  |         prompt was so weird that we could not match it, so we use a few tricks | ||
|  |         to guess when we have reached the prompt. Then we hope for the best and | ||
|  |         blindly try to reset the prompt to something more unique. If that fails | ||
|  |         then login() raises an ExceptionPxssh exception. | ||
|  | 
 | ||
|  |         In some situations it is not possible or desirable to reset the | ||
|  |         original prompt. In this case, set 'auto_prompt_reset' to False to | ||
|  |         inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh | ||
|  |         uses a unique prompt in the prompt() method. If the original prompt is | ||
|  |         not reset then this will disable the prompt() method unless you | ||
|  |         manually set the PROMPT attribute. """
 | ||
|  | 
 | ||
|  |         ssh_options = '-q' | ||
|  |         if self.force_password: | ||
|  |             ssh_options = ssh_options + ' ' + self.SSH_OPTS | ||
|  |         if port is not None: | ||
|  |             ssh_options = ssh_options + ' -p %s' % (str(port)) | ||
|  |         cmd = "ssh %s -l %s %s" % (ssh_options, username, server) | ||
|  | 
 | ||
|  |         # This does not distinguish between a remote server 'password' prompt | ||
|  |         # and a local ssh 'passphrase' prompt (for unlocking a private key). | ||
|  |         spawn._spawn(self, cmd) | ||
|  |         i = self.expect( | ||
|  |             [ | ||
|  |                 "(?i)are you sure you want to continue connecting", | ||
|  |                 original_prompt, | ||
|  |                 "(?i)(?:password)|(?:passphrase for key)", | ||
|  |                 "(?i)permission denied", | ||
|  |                 "(?i)terminal type", | ||
|  |                 TIMEOUT, | ||
|  |                 "(?i)connection closed by remote host"], | ||
|  |             timeout=login_timeout) | ||
|  | 
 | ||
|  |         # First phase | ||
|  |         if i == 0: | ||
|  |             # New certificate -- always accept it. | ||
|  |             # This is what you get if SSH does not have the remote host's | ||
|  |             # public key stored in the 'known_hosts' cache. | ||
|  |             self.sendline("yes") | ||
|  |             i = self.expect( | ||
|  |                 [ | ||
|  |                     "(?i)are you sure you want to continue connecting", | ||
|  |                     original_prompt, | ||
|  |                     "(?i)(?:password)|(?:passphrase for key)", | ||
|  |                     "(?i)permission denied", | ||
|  |                     "(?i)terminal type", | ||
|  |                     TIMEOUT]) | ||
|  |         if i == 2:  # password or passphrase | ||
|  |             self.sendline(password) | ||
|  |             i = self.expect( | ||
|  |                 [ | ||
|  |                     "(?i)are you sure you want to continue connecting", | ||
|  |                     original_prompt, | ||
|  |                     "(?i)(?:password)|(?:passphrase for key)", | ||
|  |                     "(?i)permission denied", | ||
|  |                     "(?i)terminal type", | ||
|  |                     TIMEOUT]) | ||
|  |         if i == 4: | ||
|  |             self.sendline(terminal_type) | ||
|  |             i = self.expect( | ||
|  |                 [ | ||
|  |                     "(?i)are you sure you want to continue connecting", | ||
|  |                     original_prompt, | ||
|  |                     "(?i)(?:password)|(?:passphrase for key)", | ||
|  |                     "(?i)permission denied", | ||
|  |                     "(?i)terminal type", | ||
|  |                     TIMEOUT]) | ||
|  | 
 | ||
|  |         # Second phase | ||
|  |         if i == 0: | ||
|  |             # This is weird. This should not happen twice in a row. | ||
|  |             self.close() | ||
|  |             raise ExceptionPxssh( | ||
|  |                 'Weird error. Got "are you sure" prompt twice.') | ||
|  |         elif i == 1:  # can occur if you have a public key pair set to authenticate. | ||
|  |             # TODO: May NOT be OK if expect() got tricked and matched a false | ||
|  |             # prompt. | ||
|  |             pass | ||
|  |         elif i == 2:  # password prompt again | ||
|  |             # For incorrect passwords, some ssh servers will | ||
|  |             # ask for the password again, others return 'denied' right away. | ||
|  |             # If we get the password prompt again then this means | ||
|  |             # we didn't get the password right the first time. | ||
|  |             self.close() | ||
|  |             raise ExceptionPxssh('password refused') | ||
|  |         elif i == 3:  # permission denied -- password was bad. | ||
|  |             self.close() | ||
|  |             raise ExceptionPxssh('permission denied') | ||
|  |         elif i == 4:  # terminal type again? WTF? | ||
|  |             self.close() | ||
|  |             raise ExceptionPxssh( | ||
|  |                 'Weird error. Got "terminal type" prompt twice.') | ||
|  |         elif i == 5:  # Timeout | ||
|  |             # This is tricky... I presume that we are at the command-line prompt. | ||
|  |             # It may be that the shell prompt was so weird that we couldn't match | ||
|  |             # it. Or it may be that we couldn't log in for some other reason. I | ||
|  |             # can't be sure, but it's safe to guess that we did login because if | ||
|  |             # I presume wrong and we are not logged in then this should be caught | ||
|  |             # later when I try to set the shell prompt. | ||
|  |             pass | ||
|  |         elif i == 6:  # Connection closed by remote host | ||
|  |             self.close() | ||
|  |             raise ExceptionPxssh('connection closed') | ||
|  |         else:  # Unexpected | ||
|  |             self.close() | ||
|  |             raise ExceptionPxssh('unexpected login response') | ||
|  |         if not self.sync_original_prompt(): | ||
|  |             self.close() | ||
|  |             raise ExceptionPxssh('could not synchronize with original prompt') | ||
|  |         # We appear to be in. | ||
|  |         # set shell prompt to something unique. | ||
|  |         if auto_prompt_reset: | ||
|  |             if not self.set_unique_prompt(): | ||
|  |                 self.close() | ||
|  |                 raise ExceptionPxssh( | ||
|  |                     'could not set shell prompt\n' + self.before) | ||
|  |         return True | ||
|  | 
 | ||
|  |     def logout(self): | ||
|  |         """This sends exit to the remote shell. If there are stopped jobs then
 | ||
|  |         this automatically sends exit twice. """
 | ||
|  | 
 | ||
|  |         self.sendline("exit") | ||
|  |         index = self.expect([EOF, "(?i)there are stopped jobs"]) | ||
|  |         if index == 1: | ||
|  |             self.sendline("exit") | ||
|  |             self.expect(EOF) | ||
|  |         self.close() | ||
|  | 
 | ||
|  |     def prompt(self, timeout=20): | ||
|  |         """This matches the shell prompt. This is little more than a short-cut
 | ||
|  |         to the expect() method. This returns True if the shell prompt was | ||
|  |         matched. This returns False if there was a timeout. Note that if you | ||
|  |         called login() with auto_prompt_reset set to False then you should have | ||
|  |         manually set the PROMPT attribute to a regex pattern for matching the | ||
|  |         prompt. """
 | ||
|  | 
 | ||
|  |         i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) | ||
|  |         if i == 1: | ||
|  |             return False | ||
|  |         return True | ||
|  | 
 | ||
|  |     def set_unique_prompt(self): | ||
|  |         """This sets the remote prompt to something more unique than # or $.
 | ||
|  |         This makes it easier for the prompt() method to match the shell prompt | ||
|  |         unambiguously. This method is called automatically by the login() | ||
|  |         method, but you may want to call it manually if you somehow reset the | ||
|  |         shell prompt. For example, if you 'su' to a different user then you | ||
|  |         will need to manually reset the prompt. This sends shell commands to | ||
|  |         the remote host to set the prompt, so this assumes the remote host is | ||
|  |         ready to receive commands. | ||
|  | 
 | ||
|  |         Alternatively, you may use your own prompt pattern. Just set the PROMPT | ||
|  |         attribute to a regular expression that matches it. In this case you | ||
|  |         should call login() with auto_prompt_reset=False; then set the PROMPT | ||
|  |         attribute. After that the prompt() method will try to match your prompt | ||
|  |         pattern."""
 | ||
|  | 
 | ||
|  |         self.sendline("unset PROMPT_COMMAND") | ||
|  |         self.sendline(self.PROMPT_SET_SH)  # sh-style | ||
|  |         i = self.expect([TIMEOUT, self.PROMPT], timeout=10) | ||
|  |         if i == 0:  # csh-style | ||
|  |             self.sendline(self.PROMPT_SET_CSH) | ||
|  |             i = self.expect([TIMEOUT, self.PROMPT], timeout=10) | ||
|  |             if i == 0: | ||
|  |                 return False | ||
|  |         return True | ||
|  | 
 | ||
|  | # vi:ts=4:sw=4:expandtab:ft=python: |