Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def pipe_example():
- inp = Pipemonoid(["printf", "hello world"]).stdout_to_pipe()
- gzip = Pipemonoid(["gzip", "-c"]).stdout_to_pipe()
- gunzip = Pipemonoid(["gunzip", "-c"]).stdout_to_pipe()
- base32 = Pipemonoid(["base32"])
- test_pipe = inp + gzip + gunzip + base32
- (stdout, stderr) = test_pipe.stdout_to_pipe().execute()
- print stdout
- class Pipemonoid(object):
- def __init__(self, *args, **kwargs):
- self._delegates = [[args, kwargs]]
- self._current = None
- self._temps = []
- @property
- def last_process(self):
- """
- Internal use, do not touch
- """
- return self._delegates[-1]
- def add_process(self, process):
- """
- Internal use, do not touch
- """
- self._delegates.append(process)
- return self
- def set_named_parameter(self, key, val):
- """
- Set a arbitrary named parameter for the
- current top pipe
- """
- last = self.last_process
- xs = last[1]
- xs[key] = val
- last[1] = xs
- def bufsize(self, bufsize):
- """
- Set the buffer size
- """
- self.set_named_parameter("bufsize",bufsize)
- return self
- def __add__(self, other):
- return self.pipe(other)
- def stdout_to_pipe(self):
- """
- Send the stdout down the pipeline
- """
- self.set_named_parameter("stdout", subprocess.PIPE)
- return self
- # Realize the last entry, internal usage only!
- def realize(self):
- """
- Realize the last entry of the pipeline
- This will instantiate the pipe, and opens
- a subprocess
- """
- last = self.last_process
- args = last[0]
- kwargs = last[1]
- self._current = subprocess.Popen(*args, **kwargs)
- return self
- def stderr_to_pipe(self):
- """
- Send stderr to the next pipe
- """
- self.set_named_parameter("stderr", subprocess.PIPE)
- return self
- def pipe(self, nextprocess):
- """
- Concatenate two pipes together. This is
- the binary operation of the pipe monoid.
- """
- self.realize()
- self._delegates.append(nextprocess.last_process)
- self.set_named_parameter("stdin", self._current.stdout)
- return self
- def stdout_to_temp(self):
- """
- Redirect stdout to a named tempfile
- returns the name
- """
- tmp = tempfile.NamedTemporaryFile(delete=False)
- self._temps.append(tmp)
- self.set_named_parameter("stdout", tmp)
- return tmp
- def stderr_to_temp(self):
- """
- redirect stderr to a named temp file
- returns the name
- """
- tmp = tempfile.NamedTemporaryFile(delete = False)
- self._temps.append(tmp)
- self.set_named_parameter("stderr", tmp)
- return tmp
- def close(self):
- """
- Closes all the open file handles
- """
- for tmp in self._temps:
- tmp.close()
- if os.path.exists(tmp.name):
- os.unlink(tmp.name)
- def stdout_to_file(self, fp):
- """
- Redirect stdout to a file
- """
- self.set_named_parameter("stdout", fp)
- return self
- def stderr_to_file(self, fp):
- """
- Redirect stderr to a file
- """
- self.set_named_parameter("stderr", fp)
- return self
- def stdin_from_file(self, fp):
- """
- Redirect a file to stdin
- """
- self.set_named_parameter("stdin", fp)
- return self
- def execute(self, stdin = None):
- """
- Execute the build up monoidal action
- """
- self.realize()
- (stdout, stderr) = self._current.communicate(stdin)
- return (stdout, stderr)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement