Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- defmodule FromPipe do
- #
- # Use a helper script "from_pipe_release" to
- # release/request each line read from the
- # named pipe - effectively implementing a
- # crude backpressure mechanism
- #
- @pipe_name "/tmp/testpipe"
- @from_pipe_release "./from_pipe_release"
- @from_pipe_clean "./from_pipe_clean"
- # * terminate potential zombie OS process
- # * trash potential left over named pipe
- #
- def clean() do
- script_path = Path.expand(@from_pipe_release)
- path = Path.expand(@from_pipe_clean)
- System.cmd(path, [script_path, @pipe_name])
- end
- # Signal releasing OS process to clean up and terminate
- #
- def sig_int(os_pid) do
- System.cmd("kill", ["-INT", Integer.to_string(os_pid)])
- :ok
- end
- # Start the releasing OS process script.
- # Monitor in case process terminates
- #
- def start do
- path = Path.expand(@from_pipe_release)
- args = [@pipe_name]
- port = Port.open({:spawn_executable, path}, [:binary, args: args])
- ref = Port.monitor(port)
- {port, ref}
- end
- # Demonitor and Close and close port
- # Signal OS process to terminate
- #
- def stop({port, ref}) do
- Port.demonitor(ref, [:flush])
- info = Port.info(port, :os_pid)
- Port.close(port)
- # interrupt script reading from named pipe
- case info do
- {:os_pid, os_pid} ->
- sig_int(os_pid)
- _ ->
- :ok
- end
- end
- # Data Processing Loop
- #
- def loop({port, ref} = args) do
- # i.e. ready for next line
- Port.command(port, "ready\n")
- receive do
- {^port, {:data, "quit\n"}} ->
- stop(args)
- {^port, {:data, data}} ->
- IO.puts("port data: #{data}")
- loop(args)
- {:DOWN, ^ref, :port, ^port, _reason} ->
- # script was terminated
- :ok
- msg ->
- IO.puts("other: #{inspect(msg)}")
- loop(args)
- end
- end
- # Start "listening""
- #
- def run() do
- clean()
- loop(start())
- end
- end
Add Comment
Please, Sign In to add comment