Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

Process Stream in D

By: JNewt on Sep 8th, 2011  |  syntax: D  |  size: 4.75 KB  |  views: 137  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. module ccb.io.ProcessStream2;
  2.  
  3. import std.stream,
  4.         std.string : format, toStringz;
  5. import std.c.stdio,
  6.         std.c.linux.linux,
  7.         core.sys.posix.sys.wait;
  8. import std.c.stdlib: exit;
  9. import core.stdc.errno;
  10.  
  11. ///
  12. class ProcessException : Exception {
  13.         this(string msg)
  14.         {
  15.                 super(msg);
  16.         }
  17. }
  18.  
  19. ///
  20. class ProcessStream : Stream {
  21.        
  22.         // create these aliases so that we don't get confused with the Stream.read, close, etc.
  23.         alias core.sys.posix.unistd.close pipe_close;
  24.         alias core.sys.posix.unistd.read pipe_read;
  25.         alias core.sys.posix.unistd.write pipe_write;
  26.  
  27.         alias core.sys.posix.sys.wait.wait child_wait;
  28.  
  29.         const string command;                                   ///
  30.  
  31.         public int TimeoutSeconds               = 5,    ///
  32.                                 TimeoutMicroseconds = 0;        ///
  33.  
  34.         private bool iseof = false;
  35.  
  36.         private int[2] outputFD,
  37.                                         inputFD,
  38.                                         errorFD;
  39.         private const PREAD = 0,
  40.                                   PWRITE = 1;
  41.        
  42.         private int pid;                // child process id
  43.  
  44.         ///
  45.         this(string cmd)
  46.         {
  47.                 readable = true;
  48.                 writeable = true;
  49.                 seekable = false;
  50.  
  51.                 command = cmd;
  52.                 initialize();
  53.         }
  54.  
  55.         ~this()
  56.         {
  57.                 // close my end of the pipes
  58.                 pipe_close(outputFD[PWRITE]);
  59.                 pipe_close(inputFD[PREAD]);
  60.                 pipe_close(errorFD[PREAD]);
  61.         }
  62.  
  63.         private void initialize()
  64.         {
  65.                 // construct our pipes
  66.                 if (0 != pipe(outputFD))
  67.                         throw new Exception("Failed to create output pipe");
  68.                 if (0 != pipe(inputFD))
  69.                         throw new Exception("Failed to create input pipe");
  70.                 if (0 != pipe(errorFD))
  71.                         throw new Exception("Failed to create error pipe");
  72.  
  73.                 // fork the child process
  74.                 pid = fork();
  75.                 if (pid == 0)
  76.                 {
  77.                         // This is the child process
  78.                
  79.                         // close the existing STD* pipes
  80.                         pipe_close(STDIN_FILENO);
  81.                         pipe_close(STDOUT_FILENO);
  82.                         pipe_close(STDERR_FILENO);
  83.  
  84.                         // Make the appropriate ends the STD*
  85.                         dup2(outputFD[PREAD], STDIN_FILENO);
  86.                         dup2(inputFD[PWRITE], STDOUT_FILENO);
  87.                         dup2(errorFD[PWRITE], STDERR_FILENO);
  88.  
  89.                         pipe_close(outputFD[PWRITE]); // Close the parent's ends
  90.                         pipe_close(inputFD[PREAD]);
  91.                         pipe_close(errorFD[PREAD]);
  92.  
  93.                         // execute the command and terminate
  94.                         exit(system(toStringz(command)));
  95.  
  96.                 } else if (pid < 0)
  97.                         throw new Exception("Fork failed");
  98.  
  99.                 // This is the parent process
  100.  
  101.                 pipe_close(outputFD[PREAD]); // These are being used by the child
  102.                 pipe_close(inputFD[PWRITE]);
  103.                 pipe_close(errorFD[PWRITE]);
  104.         }
  105.  
  106.         override bool eof() {
  107.                 return iseof;
  108.         }
  109.  
  110.         override size_t readBlock(void* buffer, size_t size)
  111.         {
  112.                 if (hasData(inputFD[PREAD]))
  113.                 {
  114.                         long ret = pipe_read(inputFD[PREAD], buffer, size);
  115.                         if (ret == -1)
  116.                                 throw new Exception(format("Error reading data: %s", errno));
  117.                         return cast(size_t)ret;
  118.                 }
  119.                 return 0;
  120.         }
  121.  
  122.         override size_t writeBlock(const void* buffer, size_t size)
  123.         {
  124.                 return pipe_write(outputFD[PWRITE], buffer, size);
  125.         }
  126.  
  127.         override ulong seek(long offset, SeekPos whence) {
  128.                 throw new Exception("ProcessStream does not support seeking.");
  129.         }
  130.  
  131.         /**
  132.          * Waits for the child process to finish and returns its
  133.          *  exit code.
  134.          *
  135.          * BUG: This always returns 0 at the moment, not sure why.
  136.          */
  137.         public int wait()
  138.         {
  139.                 int childStatus;
  140.                 assert(child_wait(&childStatus) > 0);
  141.                 assert(WIFEXITED(childStatus));
  142.  
  143.                 checkError();
  144.  
  145.                 return WEXITSTATUS(childStatus);
  146.         }
  147.  
  148.         protected bool hasData(int fd)
  149.         {
  150.                 auto errPipe = errorFD[PREAD];
  151.                 auto highest = fd > errPipe ? fd + 1 : errPipe + 1;
  152.  
  153.                 fd_set checkRead;
  154.                 FD_ZERO(&checkRead);
  155.  
  156.                 FD_SET(fd, &checkRead);
  157.                 FD_SET(errPipe, &checkRead);
  158.  
  159.                 timeval tv;
  160.                 tv.tv_sec = TimeoutSeconds;
  161.                 tv.tv_usec = TimeoutMicroseconds;
  162.                 auto r = select(highest, &checkRead, null, null, &tv);
  163.  
  164.                 if (r > 0)
  165.                 {
  166.                         // check for something on the error pipe
  167.                         if (FD_ISSET(errPipe, &checkRead))
  168.                                 handleError();
  169.  
  170.                         // check the input pipe
  171.                         if (FD_ISSET(fd, &checkRead))
  172.                                 return true;
  173.  
  174.                 } else if (r == -1) {
  175.                         throw new Exception("select error");
  176.                 }
  177.                 iseof = true;
  178.                 return false;
  179.         }
  180.  
  181.         protected void checkError()
  182.         {
  183.                 auto errPipe = errorFD[PREAD];
  184.                 auto highest = errPipe + 1;
  185.  
  186.                 fd_set checkRead;
  187.                 FD_ZERO(&checkRead);
  188.                 FD_SET(errPipe, &checkRead);
  189.  
  190.                 timeval tv;
  191.                 tv.tv_sec = TimeoutSeconds;
  192.                 tv.tv_usec = TimeoutMicroseconds;
  193.                 auto r = select(highest, &checkRead, null, null, &tv);
  194.  
  195.                 if (r > 0)
  196.                 {
  197.                         // check for something on the error pipe
  198.                         if (FD_ISSET(errPipe, &checkRead))
  199.                                 handleError();
  200.  
  201.                 } else if (r == -1) {
  202.                         throw new Exception("select error");
  203.                 }
  204.         }
  205.  
  206.         protected void handleError()
  207.         {
  208.                 char[] err;
  209.                 char[100] buf;
  210.                 int len;
  211.                 do {
  212.                         len = pipe_read(errorFD[PREAD], buf.ptr, 100);
  213.                         if (len == 0)
  214.                                 break;
  215.                         else if (len < 0)
  216.                                 throw new Exception("Bad pipe read");
  217.                        
  218.                         err ~= buf[0..len].dup;
  219.                 } while (len == 100);
  220.        
  221.                 if (len)
  222.                         throw new ProcessException(err.idup);
  223.         }
  224. }