JNewt

Process Stream in D

Sep 8th, 2011
354
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
D 4.75 KB | None | 0 0
  1. module ccb.io.ProcessStream2;
  2.  
  3. import std.stream,
  4.     std.string : format, toStringz;
  5. import std.c.stdio,
  6.     std.c.linux.linux,
  7.     core.sys.posix.sys.wait;
  8. import std.c.stdlib: exit;
  9. import core.stdc.errno;
  10.  
  11. ///
  12. class ProcessException : Exception {
  13.     this(string msg)
  14.     {
  15.         super(msg);
  16.     }
  17. }
  18.  
  19. ///
  20. class ProcessStream : Stream {
  21.    
  22.     // create these aliases so that we don't get confused with the Stream.read, close, etc.
  23.     alias core.sys.posix.unistd.close pipe_close;
  24.     alias core.sys.posix.unistd.read pipe_read;
  25.     alias core.sys.posix.unistd.write pipe_write;
  26.  
  27.     alias core.sys.posix.sys.wait.wait child_wait;
  28.  
  29.     const string command;                   ///
  30.  
  31.     public int TimeoutSeconds       = 5,    ///
  32.                 TimeoutMicroseconds = 0;    ///
  33.  
  34.     private bool iseof = false;
  35.  
  36.     private int[2] outputFD,
  37.                     inputFD,
  38.                     errorFD;
  39.     private const PREAD = 0,
  40.                   PWRITE = 1;
  41.    
  42.     private int pid;        // child process id
  43.  
  44.     ///
  45.     this(string cmd)
  46.     {
  47.         readable = true;
  48.         writeable = true;
  49.         seekable = false;
  50.  
  51.         command = cmd;
  52.         initialize();
  53.     }
  54.  
  55.     ~this()
  56.     {
  57.         // close my end of the pipes
  58.         pipe_close(outputFD[PWRITE]);
  59.         pipe_close(inputFD[PREAD]);
  60.         pipe_close(errorFD[PREAD]);
  61.     }
  62.  
  63.     private void initialize()
  64.     {
  65.         // construct our pipes
  66.         if (0 != pipe(outputFD))
  67.             throw new Exception("Failed to create output pipe");
  68.         if (0 != pipe(inputFD))
  69.             throw new Exception("Failed to create input pipe");
  70.         if (0 != pipe(errorFD))
  71.             throw new Exception("Failed to create error pipe");
  72.  
  73.         // fork the child process
  74.         pid = fork();
  75.         if (pid == 0)
  76.         {
  77.             // This is the child process
  78.        
  79.             // close the existing STD* pipes
  80.             pipe_close(STDIN_FILENO);
  81.             pipe_close(STDOUT_FILENO);
  82.             pipe_close(STDERR_FILENO);
  83.  
  84.             // Make the appropriate ends the STD*
  85.             dup2(outputFD[PREAD], STDIN_FILENO);
  86.             dup2(inputFD[PWRITE], STDOUT_FILENO);
  87.             dup2(errorFD[PWRITE], STDERR_FILENO);
  88.  
  89.             pipe_close(outputFD[PWRITE]); // Close the parent's ends
  90.             pipe_close(inputFD[PREAD]);
  91.             pipe_close(errorFD[PREAD]);
  92.  
  93.             // execute the command and terminate
  94.             exit(system(toStringz(command)));
  95.  
  96.         } else if (pid < 0)
  97.             throw new Exception("Fork failed");
  98.  
  99.         // This is the parent process
  100.  
  101.         pipe_close(outputFD[PREAD]); // These are being used by the child
  102.         pipe_close(inputFD[PWRITE]);
  103.         pipe_close(errorFD[PWRITE]);
  104.     }
  105.  
  106.     override bool eof() {
  107.         return iseof;
  108.     }
  109.  
  110.     override size_t readBlock(void* buffer, size_t size)
  111.     {
  112.         if (hasData(inputFD[PREAD]))
  113.         {
  114.             long ret = pipe_read(inputFD[PREAD], buffer, size);
  115.             if (ret == -1)
  116.                 throw new Exception(format("Error reading data: %s", errno));
  117.             return cast(size_t)ret;
  118.         }
  119.         return 0;
  120.     }
  121.  
  122.     override size_t writeBlock(const void* buffer, size_t size)
  123.     {
  124.         return pipe_write(outputFD[PWRITE], buffer, size);
  125.     }
  126.  
  127.     override ulong seek(long offset, SeekPos whence) {
  128.         throw new Exception("ProcessStream does not support seeking.");
  129.     }
  130.  
  131.     /**
  132.      * Waits for the child process to finish and returns its
  133.      *  exit code.
  134.      *
  135.      * BUG: This always returns 0 at the moment, not sure why.
  136.      */
  137.     public int wait()
  138.     {
  139.         int childStatus;
  140.         assert(child_wait(&childStatus) > 0);
  141.         assert(WIFEXITED(childStatus));
  142.  
  143.         checkError();
  144.  
  145.         return WEXITSTATUS(childStatus);
  146.     }
  147.  
  148.     protected bool hasData(int fd)
  149.     {
  150.         auto errPipe = errorFD[PREAD];
  151.         auto highest = fd > errPipe ? fd + 1 : errPipe + 1;
  152.  
  153.         fd_set checkRead;
  154.         FD_ZERO(&checkRead);
  155.  
  156.         FD_SET(fd, &checkRead);
  157.         FD_SET(errPipe, &checkRead);
  158.  
  159.         timeval tv;
  160.         tv.tv_sec = TimeoutSeconds;
  161.         tv.tv_usec = TimeoutMicroseconds;
  162.         auto r = select(highest, &checkRead, null, null, &tv);
  163.  
  164.         if (r > 0)
  165.         {
  166.             // check for something on the error pipe
  167.             if (FD_ISSET(errPipe, &checkRead))
  168.                 handleError();
  169.  
  170.             // check the input pipe
  171.             if (FD_ISSET(fd, &checkRead))
  172.                 return true;
  173.  
  174.         } else if (r == -1) {
  175.             throw new Exception("select error");
  176.         }
  177.         iseof = true;
  178.         return false;
  179.     }
  180.  
  181.     protected void checkError()
  182.     {
  183.         auto errPipe = errorFD[PREAD];
  184.         auto highest = errPipe + 1;
  185.  
  186.         fd_set checkRead;
  187.         FD_ZERO(&checkRead);
  188.         FD_SET(errPipe, &checkRead);
  189.  
  190.         timeval tv;
  191.         tv.tv_sec = TimeoutSeconds;
  192.         tv.tv_usec = TimeoutMicroseconds;
  193.         auto r = select(highest, &checkRead, null, null, &tv);
  194.  
  195.         if (r > 0)
  196.         {
  197.             // check for something on the error pipe
  198.             if (FD_ISSET(errPipe, &checkRead))
  199.                 handleError();
  200.  
  201.         } else if (r == -1) {
  202.             throw new Exception("select error");
  203.         }
  204.     }
  205.  
  206.     protected void handleError()
  207.     {
  208.         char[] err;
  209.         char[100] buf;
  210.         int len;
  211.         do {
  212.             len = pipe_read(errorFD[PREAD], buf.ptr, 100);
  213.             if (len == 0)
  214.                 break;
  215.             else if (len < 0)
  216.                 throw new Exception("Bad pipe read");
  217.            
  218.             err ~= buf[0..len].dup;
  219.         } while (len == 100);
  220.    
  221.         if (len)
  222.             throw new ProcessException(err.idup);
  223.     }
  224. }
Add Comment
Please, Sign In to add comment