module ccb.io.ProcessStream2; import std.stream, std.string : format, toStringz; import std.c.stdio, std.c.linux.linux, core.sys.posix.sys.wait; import std.c.stdlib: exit; import core.stdc.errno; /// class ProcessException : Exception { this(string msg) { super(msg); } } /// class ProcessStream : Stream { // create these aliases so that we don't get confused with the Stream.read, close, etc. alias core.sys.posix.unistd.close pipe_close; alias core.sys.posix.unistd.read pipe_read; alias core.sys.posix.unistd.write pipe_write; alias core.sys.posix.sys.wait.wait child_wait; const string command; /// public int TimeoutSeconds = 5, /// TimeoutMicroseconds = 0; /// private bool iseof = false; private int[2] outputFD, inputFD, errorFD; private const PREAD = 0, PWRITE = 1; private int pid; // child process id /// this(string cmd) { readable = true; writeable = true; seekable = false; command = cmd; initialize(); } ~this() { // close my end of the pipes pipe_close(outputFD[PWRITE]); pipe_close(inputFD[PREAD]); pipe_close(errorFD[PREAD]); } private void initialize() { // construct our pipes if (0 != pipe(outputFD)) throw new Exception("Failed to create output pipe"); if (0 != pipe(inputFD)) throw new Exception("Failed to create input pipe"); if (0 != pipe(errorFD)) throw new Exception("Failed to create error pipe"); // fork the child process pid = fork(); if (pid == 0) { // This is the child process // close the existing STD* pipes pipe_close(STDIN_FILENO); pipe_close(STDOUT_FILENO); pipe_close(STDERR_FILENO); // Make the appropriate ends the STD* dup2(outputFD[PREAD], STDIN_FILENO); dup2(inputFD[PWRITE], STDOUT_FILENO); dup2(errorFD[PWRITE], STDERR_FILENO); pipe_close(outputFD[PWRITE]); // Close the parent's ends pipe_close(inputFD[PREAD]); pipe_close(errorFD[PREAD]); // execute the command and terminate exit(system(toStringz(command))); } else if (pid < 0) throw new Exception("Fork failed"); // This is the parent process pipe_close(outputFD[PREAD]); // These are being used by the child pipe_close(inputFD[PWRITE]); pipe_close(errorFD[PWRITE]); } override bool eof() { return iseof; } override size_t readBlock(void* buffer, size_t size) { if (hasData(inputFD[PREAD])) { long ret = pipe_read(inputFD[PREAD], buffer, size); if (ret == -1) throw new Exception(format("Error reading data: %s", errno)); return cast(size_t)ret; } return 0; } override size_t writeBlock(const void* buffer, size_t size) { return pipe_write(outputFD[PWRITE], buffer, size); } override ulong seek(long offset, SeekPos whence) { throw new Exception("ProcessStream does not support seeking."); } /** * Waits for the child process to finish and returns its * exit code. * * BUG: This always returns 0 at the moment, not sure why. */ public int wait() { int childStatus; assert(child_wait(&childStatus) > 0); assert(WIFEXITED(childStatus)); checkError(); return WEXITSTATUS(childStatus); } protected bool hasData(int fd) { auto errPipe = errorFD[PREAD]; auto highest = fd > errPipe ? fd + 1 : errPipe + 1; fd_set checkRead; FD_ZERO(&checkRead); FD_SET(fd, &checkRead); FD_SET(errPipe, &checkRead); timeval tv; tv.tv_sec = TimeoutSeconds; tv.tv_usec = TimeoutMicroseconds; auto r = select(highest, &checkRead, null, null, &tv); if (r > 0) { // check for something on the error pipe if (FD_ISSET(errPipe, &checkRead)) handleError(); // check the input pipe if (FD_ISSET(fd, &checkRead)) return true; } else if (r == -1) { throw new Exception("select error"); } iseof = true; return false; } protected void checkError() { auto errPipe = errorFD[PREAD]; auto highest = errPipe + 1; fd_set checkRead; FD_ZERO(&checkRead); FD_SET(errPipe, &checkRead); timeval tv; tv.tv_sec = TimeoutSeconds; tv.tv_usec = TimeoutMicroseconds; auto r = select(highest, &checkRead, null, null, &tv); if (r > 0) { // check for something on the error pipe if (FD_ISSET(errPipe, &checkRead)) handleError(); } else if (r == -1) { throw new Exception("select error"); } } protected void handleError() { char[] err; char[100] buf; int len; do { len = pipe_read(errorFD[PREAD], buf.ptr, 100); if (len == 0) break; else if (len < 0) throw new Exception("Bad pipe read"); err ~= buf[0..len].dup; } while (len == 100); if (len) throw new ProcessException(err.idup); } }