Advertisement
Guest User

Untitled

a guest
Nov 3rd, 2012
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 9.19 KB | None | 0 0
  1.  
  2. package edu.nitin.testcodes;
  3.  
  4. import java.io.InputStream;
  5. import java.io.OutputStream;
  6.  
  7. public interface ProcessExecutor {
  8.     public void start() throws ProcessExecutionException;
  9.     public InputStream getStandardOutput() throws ProcessExecutionException;
  10.     public InputStream getStandardError() throws ProcessExecutionException;
  11.     public OutputStream getStandardInput() throws ProcessExecutionException;
  12.     public int waitFor() throws ProcessExecutionException;
  13.     public void stop() throws ProcessExecutionException;
  14.     public int exitCode() throws ProcessExecutionException;
  15. }
  16. //====
  17. package edu.nitin.testcodes;
  18.  
  19. public interface Destructible {
  20.  
  21.     public void destroy();
  22. }
  23.  
  24. //====
  25. package edu.nitin.testcodes;
  26.  
  27. public interface DestrutableRunner extends Runnable, Destructible {
  28.    
  29. }
  30.  
  31. //====
  32. package edu.nitin.testcodes;
  33.  
  34. import java.io.InputStream;
  35.  
  36. public interface InputStreamRunner extends DestrutableRunner {
  37.  
  38.     public void setInputStream(final InputStream inputStream);
  39. }
  40. //===
  41. package edu.nitin.testcodes;
  42.  
  43. import java.io.*;
  44. import java.util.ArrayList;
  45. import java.util.List;
  46.  
  47. public class FlumeProcessExecutor implements ProcessExecutor {
  48.  
  49.     private Process process;
  50.     private final List<String> command;
  51.     private InputStream standardOutput;
  52.     private InputStream standardError;
  53.     private OutputStream standardInput;
  54.     private InputStreamRunner standardOutputRunner;
  55.     private InputStreamRunner standardErrorRunner;
  56.  
  57.     public FlumeProcessExecutor(final String... command) throws ProcessExecutionException {
  58.         this.command = new ArrayList<String>(command.length);
  59.         for (final String arg : command) {
  60.             this.command.add(arg);
  61.         }
  62.     }
  63.    
  64.     public FlumeProcessExecutor(final List<String> command) throws ProcessExecutionException {
  65.         this(command, null, null);
  66.     }
  67.  
  68.     public FlumeProcessExecutor(final List<String> command,
  69.             final InputStreamRunner standardErrorRunner) throws ProcessExecutionException {
  70.         this(command, standardErrorRunner, null);
  71.     }
  72.  
  73.     public FlumeProcessExecutor(final List<String> command,
  74.             final InputStreamRunner standardErrorRunner,
  75.             final InputStreamRunner standardOutputRunner) throws ProcessExecutionException {
  76.         if (command == null) {
  77.             throw new ProcessExecutionException("Command can not be null");
  78.         }
  79.         this.command = command;
  80.         this.standardErrorRunner = standardErrorRunner;
  81.         this.standardOutputRunner = standardOutputRunner;
  82.     }
  83.  
  84.     public void start() throws ProcessExecutionException {
  85.         //System.err.println(command);
  86.         try {
  87.             process = new ProcessBuilder(command).start();
  88.         } catch (final IOException ioe) {
  89.             throw new ProcessExecutionException("Could not build process", ioe);
  90.         }
  91.         //System.err.println(process);
  92.         standardInput = process.getOutputStream();
  93.  
  94.         //Pipe standard out
  95.         if (standardOutputRunner == null) {
  96.             final PipedOutputStream pipedOutputStream = new PipedOutputStream();
  97.             standardOutput = pipeOutput(pipedOutputStream);
  98.             standardOutputRunner = attachOutput(pipedOutputStream);
  99.         }
  100.         standardOutputRunner.setInputStream(process.getInputStream());
  101.         final Thread standardOutputThread = new Thread(standardOutputRunner, "stdout");
  102.         standardOutputThread.start();
  103.  
  104.         //Pipe standard err
  105.         if (standardErrorRunner == null) {
  106.             final PipedOutputStream pipedOutputStream = new PipedOutputStream();
  107.             standardError = pipeOutput(pipedOutputStream);
  108.             standardErrorRunner = attachOutput(pipedOutputStream);
  109.         }
  110.         standardErrorRunner.setInputStream(process.getErrorStream());
  111.         final Thread standardErrThread = new Thread(standardErrorRunner, "stderr");
  112.         standardErrThread.start();
  113.        
  114.        
  115.     }
  116.  
  117.     private PipedInputStream pipeOutput(final PipedOutputStream pipedOutputStream) throws ProcessExecutionException {
  118.         final PipedInputStream pipedInputStream;
  119.         try {
  120.             pipedInputStream = new PipedInputStream(pipedOutputStream);
  121.         } catch (final IOException ioe) {
  122.             throw new ProcessExecutionException("Could not pipe streams", ioe);
  123.         }
  124.         return pipedInputStream;
  125.     }
  126.  
  127.     private InputStreamRunner attachOutput(final PipedOutputStream pipedOutputStream) {
  128.         final InputStreamRunner ioAttachRunner = new IOAttachRunner(pipedOutputStream);
  129.         return ioAttachRunner;
  130.     }
  131.  
  132.     public int waitFor() throws ProcessExecutionException {
  133.         try {
  134.             return process.waitFor();
  135.         }
  136.         catch(final InterruptedException ie) {
  137.             throw new ProcessExecutionException(ie);
  138.         }
  139.     }
  140.  
  141.     private class IOAttachRunner implements InputStreamRunner {
  142.  
  143.         private InputStream inputStream;
  144.         private final PipedOutputStream pipedOutputStream;
  145.  
  146.         public IOAttachRunner(final PipedOutputStream pipedOutputStream) {
  147.             this.pipedOutputStream = pipedOutputStream;
  148.         }
  149.        
  150.  
  151.         public void run() {
  152.             //System.out.println(this);
  153.             try {
  154.                 run0();
  155.             } catch (final Exception ex) {
  156.                 //log ie
  157.                 System.err.println(Thread.currentThread().getName());
  158.                 ex.printStackTrace();
  159.             }
  160.             finally {
  161.                 System.err.println("Exiting thread... " + Thread.currentThread().getName());
  162.             }
  163.         }
  164.  
  165.         public void run0() throws InterruptedException, IOException {
  166.             final int bufferSize = 1024;
  167.             byte[] bytes = new byte[bufferSize];
  168.  
  169.             try {
  170.                 int readCount = 0;
  171.                 while ( (readCount = inputStream.read(bytes)) > 0 ) {
  172.                     System.out.write(bytes,0,readCount);
  173.                     pipedOutputStream.write(bytes,0,readCount);
  174.                     System.out.println("[" + Thread.currentThread().getName() + "] ReadCount : " + readCount);
  175.                 }
  176.                
  177.             } finally {
  178.                 pipedOutputStream.close();
  179.             }
  180.         }
  181.  
  182.         public void destroy() {
  183.             try {
  184.                 destroy0();
  185.             } catch (final Exception ex) {
  186.                 //log ex
  187.                 ex.printStackTrace();
  188.             }
  189.         }
  190.  
  191.         private void destroy0() throws IOException {
  192.             try {
  193.                 Thread.currentThread().interrupt();
  194.             } finally {
  195.                 if (pipedOutputStream == null) {
  196.                     pipedOutputStream.close();
  197.                 }
  198.             }
  199.         }
  200.  
  201.         public void setInputStream(final InputStream inputStream) {
  202.             this.inputStream = inputStream;
  203.         }
  204.     }
  205.  
  206.     public InputStream getStandardOutput() throws ProcessExecutionException {
  207.         if (standardOutput == null && standardOutputRunner == null && process == null) {
  208.             throw new ProcessExecutionException("Process has not started");
  209.         } else if (standardOutput == null && standardOutputRunner != null) {
  210.             throw new ProcessExecutionException("Standard Output off loaded to "
  211.                     + standardOutputRunner.getClass().getName());
  212.         } else if (standardOutput == null) {
  213.             throw new ProcessExecutionException("Standard Output is not set");
  214.         }
  215.  
  216.         return standardOutput;
  217.     }
  218.  
  219.     public InputStream getStandardError() throws ProcessExecutionException {
  220.         if (standardError == null && standardErrorRunner == null && process == null) {
  221.             throw new ProcessExecutionException("Process has not started");
  222.         } else if (standardError == null && standardErrorRunner != null) {
  223.             throw new ProcessExecutionException("Standard Error off loaded to "
  224.                     + standardErrorRunner.getClass().getName());
  225.         } else if (standardError == null) {
  226.             throw new ProcessExecutionException("Standard Error is not set");
  227.         }
  228.  
  229.         return standardError;
  230.     }
  231.  
  232.     public OutputStream getStandardInput() throws ProcessExecutionException {
  233.         if (standardInput == null && process == null) {
  234.             throw new ProcessExecutionException("Process has not started");
  235.         } else if (standardInput == null) {
  236.             throw new ProcessExecutionException("Standard Input is not set");
  237.         }
  238.  
  239.         return standardInput;
  240.     }
  241.  
  242.     public void stop() throws ProcessExecutionException {
  243.         try {
  244.             stop0();
  245.         } catch (final Exception ex) {
  246.             throw new ProcessExecutionException("Process stop failed", ex);
  247.         }
  248.     }
  249.  
  250.     private void stop0() {
  251.         try {
  252.             standardErrorRunner.destroy();
  253.             standardOutputRunner.destroy();
  254.         } finally {
  255.             process.destroy();
  256.         }
  257.     }
  258.  
  259.     public int exitCode() throws ProcessExecutionException {
  260.         try {
  261.             return process.exitValue();
  262.         } catch (final Exception ex) {
  263.             throw new ProcessExecutionException("Can not get exit code", ex);
  264.         }
  265.     }
  266. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement