Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #python3.6
- from subprocess import Popen, PIPE
- from queue import Queue, Empty
- from threading import Thread
- class customPopen(Popen):
- # TODO dostep do stderr
- def __init__(self, *args, **kwargs):
- kwargs.setdefault('stdin', PIPE)
- kwargs.setdefault('stdout', PIPE)
- #kwargs.setdefault('stderr', PIPE)
- super(customPopen, self).__init__(*args, **kwargs)
- def reader(stream):
- while self.returncode is None:
- line = stream.readline().decode()
- if line.strip():
- self.stdout_lines_queue.put(line)
- self.stdout_lines_queue = Queue()
- self.stdout_reader_thread = Thread(target=reader, args=(self.stdout,))
- self.stdout_reader_thread.start()
- def custom_writeline(self, x: str):
- if not x[-1] in ['\n', '\r\n']:
- x += '\n'
- self.stdin.write(x.encode())
- self.stdin.flush()
- def custom_readline(self, timeout=1):
- try:
- return self.stdout_lines_queue.get(timeout=timeout)
- except Empty:
- raise TimeoutError
- def terminate(self):
- self.stdin.close()
- super(customPopen, self).terminate()
- def kill(self):
- self.stdin.close()
- super(customPopen, self).kill()
- def wait(self, timeout=None):
- super(customPopen, self).wait(timeout=timeout)
- self.stdout_reader_thread.join(timeout=timeout)
Add Comment
Please, Sign In to add comment