Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import print_function
- import os
- from pexpect import TIMEOUT, EOF, spawn
- from pexpect.pxssh import pxssh as orig_pxssh, ExceptionPxssh
- __all__ = ['ExceptionPxssh', 'pxssh']
- class ExceptionPxsshHostKey(ExceptionPxssh):
- pass
- # noinspection PyPep8Naming
- class pxssh(orig_pxssh):
- # a mix of pexpect.spawn (specify command) and pexpect.pxssh (specify options)
- def __init__(self, command=None, args=None, timeout=30, maxread=2000,
- searchwindowsize=None, logfile=None, cwd=None, env=None,
- ignore_sighup=False, echo=True, preexec_fn=None,
- encoding=None, codec_errors='strict', dimensions=None,
- options=None,
- config_file=None):
- self.ssh_cmd = command if command else 'ssh'
- self.ssh_args = [] if not args else args
- self.config_file = config_file
- # preexec_fn and dimensions get passed to _spawn directly unless command is None
- self.preexec_fn = preexec_fn
- self.dimensions = dimensions
- options = options if options else {}
- orig_pxssh.__init__(
- self,
- # None, args=[],
- timeout=timeout, maxread=maxread,
- searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env,
- ignore_sighup=ignore_sighup, echo=echo, # preexec_fn=preexec_fn,
- encoding=encoding, codec_errors=codec_errors, # dimensions=dimensions
- options=options,
- )
- def login(self, server, username=None, password='', terminal_type='ansi',
- original_prompt=r"[#$]", login_timeout=10, port=None,
- auto_prompt_reset=True, ssh_key=None, quiet=True,
- sync_multiplier=1, check_local_ip=True,
- config_file=None, save_host_key=True):
- ssh_options = ''.join([" -o '%s=%s'" % (o, v) for (o, v) in self.options.items()])
- if not check_local_ip:
- ssh_options = ssh_options + " -o'NoHostAuthenticationForLocalhost=yes'"
- if self.force_password:
- ssh_options = ssh_options + ' ' + self.SSH_OPTS
- ssh_args = ' '.join(self.ssh_args)
- if quiet:
- ssh_args = ssh_args + ' -q'
- if port is not None:
- ssh_args = ssh_args + ' -p %s' % (str(port))
- if username is not None:
- ssh_args = ssh_args + ' -l %s' % (str(username))
- if config_file is not None or self.config_file is not None:
- if not config_file:
- config_file = self.config_file
- ssh_args = ssh_args + ' -F %s' % config_file
- if ssh_key is not None:
- try:
- os.path.isfile(ssh_key)
- except:
- raise ExceptionPxssh('private ssh key does not exist')
- ssh_args = ssh_args + ' -i %s' % ssh_key
- cmd = "%s %s %s %s" % (self.ssh_cmd, ssh_options, ssh_args, server)
- # This does not distinguish between a remote server 'password' prompt
- # and a local ssh 'passphrase' prompt (for unlocking a private key).
- self._spawn(cmd, preexec_fn=self.preexec_fn, dimensions=self.dimensions)
- connect_prompts = ["(?i)are you sure you want to continue connecting",
- original_prompt,
- "(?i)(?:password)|(?:passphrase for key)",
- "(?i)permission denied",
- "(?i)terminal type",
- TIMEOUT]
- i = self.expect(connect_prompts + ["(?i)connection closed by remote host", EOF], timeout=login_timeout)
- # First phase
- if i == 0:
- # New certificate -- always accept it.
- # This is what you get if SSH does not have the remote host's
- # public key stored in the 'known_hosts' cache.
- if not save_host_key:
- self.close()
- raise ExceptionPxsshHostKey('Host key not in known_hosts!')
- self.sendline("yes")
- i = self.expect(connect_prompts)
- if i == 2: # password or passphrase
- self.sendline(password)
- i = self.expect(connect_prompts)
- if i == 4:
- self.sendline(terminal_type)
- i = self.expect(connect_prompts)
- if i == 7:
- self.close()
- raise ExceptionPxssh('Could not establish connection to host')
- # Second phase
- if i == 0:
- # This is weird. This should not happen twice in a row.
- self.close()
- raise ExceptionPxssh('Weird error. Got "are you sure" prompt twice.')
- elif i == 1: # can occur if you have a public key pair set to authenticate.
- # # TODO: May NOT be OK if expect() got tricked and matched a false prompt.
- pass
- elif i == 2: # password prompt again
- # For incorrect passwords, some ssh servers will
- # ask for the password again, others return 'denied' right away.
- # If we get the password prompt again then this means
- # we didn't get the password right the first time.
- self.close()
- raise ExceptionPxssh('password refused')
- elif i == 3: # permission denied -- password was bad.
- self.close()
- raise ExceptionPxssh('permission denied')
- elif i == 4: # terminal type again? WTF?
- self.close()
- raise ExceptionPxssh('Weird error. Got "terminal type" prompt twice.')
- elif i == 5: # Timeout
- # This is tricky... I presume that we are at the command-line prompt.
- # It may be that the shell prompt was so weird that we couldn't match
- # it. Or it may be that we couldn't log in for some other reason. I
- # can't be sure, but it's safe to guess that we did login because if
- # I presume wrong and we are not logged in then this should be caught
- # later when I try to set the shell prompt.
- pass
- elif i == 6: # Connection closed by remote host
- self.close()
- raise ExceptionPxssh('connection closed')
- else: # Unexpected
- self.close()
- raise ExceptionPxssh('unexpected login response')
- if not self.sync_original_prompt(sync_multiplier):
- self.close()
- raise ExceptionPxssh('could not synchronize with original prompt')
- # We appear to be in.
- # set shell prompt to something unique.
- if auto_prompt_reset:
- if not self.set_unique_prompt():
- self.close()
- raise ExceptionPxssh('could not set shell prompt '
- '(received: %r, expected: %r).' % (
- self.before, self.PROMPT,))
- return True
- login.__doc__ = orig_pxssh.login.__doc__
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement