Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from Exceptions import NotConnected, UnableToConnect, FileNotFound
- import paramiko
- import select
- class SSHClient(object):
- def __init__(self):
- self._ssh_client = paramiko.SSHClient()
- self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self._following_file = False
- # self._ssh_client.load_host_keys()
- def connect(self, address, user, password):
- try:
- self._ssh_client.connect(address, username=user, password=password)
- except:
- raise UnableToConnect()
- def is_connected(self):
- transport = self._ssh_client.get_transport() if self._ssh_client else False
- return (transport or False) and transport.is_active()
- def disconnect(self):
- if self.is_connected:
- self._ssh_client.close()
- def run_command(self, command, with_output=True):
- if not self.is_connected():
- raise NotConnected()
- print(command)
- stdin, stdout, stderr = self._ssh_client.exec_command(command, get_pty=True)
- if with_output:
- return [line.rstrip() for line in stdout.readlines()]
- def read_file(self, path):
- sftp_client = self._ssh_client.open_sftp()
- remote_file = sftp_client.open(path)
- content = remote_file.readlines()
- remote_file.close()
- sftp_client.close()
- return content
- def follow_file(self, path, buffer):
- transport = self._ssh_client.get_transport()
- channel = transport.open_session()
- print("tail -f " + path)
- channel.get_pty()
- channel.exec_command("tail -f " + path)
- self._following_file = True
- last_line = ""
- while self._following_file:
- rl, wl, xl = select.select([channel], [], [], 0.0)
- if len(rl) > 0:
- lines = [x.decode("utf-8").strip('\n') for x in channel.recv(1024).split(b'\n')]
- lines[0] = last_line + lines[0]
- last_line = lines[-1]
- for line in lines[:-1]:
- buffer.append(line)
- channel.close()
- print("Stopped following file {0}".format(path))
- def stop_following_file(self):
- self._following_file = False
- def create_file(self, path, remove_first=False):
- if remove_first:
- self.remove_file(path)
- create_command = "touch " + path
- self.run_command(create_command, with_output=False)
- def remove_file(self, path):
- remove_command = "rm " + path
- self.run_command(remove_command, with_output=False)
- def file_exists(self, path):
- output = self.run_command("test -e {0} || echo file does not exist".format(path))
- print(output)
- if output == ["file does not exist"]:
- return False
- return True
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement