Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package edu.nitin.testcodes;
- import java.io.InputStream;
- import java.io.OutputStream;
- public interface ProcessExecutor {
- public void start() throws ProcessExecutionException;
- public InputStream getStandardOutput() throws ProcessExecutionException;
- public InputStream getStandardError() throws ProcessExecutionException;
- public OutputStream getStandardInput() throws ProcessExecutionException;
- public int waitFor() throws ProcessExecutionException;
- public void stop() throws ProcessExecutionException;
- public int exitCode() throws ProcessExecutionException;
- }
- //====
- package edu.nitin.testcodes;
- public interface Destructible {
- public void destroy();
- }
- //====
- package edu.nitin.testcodes;
- public interface DestrutableRunner extends Runnable, Destructible {
- }
- //====
- package edu.nitin.testcodes;
- import java.io.InputStream;
- public interface InputStreamRunner extends DestrutableRunner {
- public void setInputStream(final InputStream inputStream);
- }
- //===
- package edu.nitin.testcodes;
- import java.io.*;
- import java.util.ArrayList;
- import java.util.List;
- public class FlumeProcessExecutor implements ProcessExecutor {
- private Process process;
- private final List<String> command;
- private InputStream standardOutput;
- private InputStream standardError;
- private OutputStream standardInput;
- private InputStreamRunner standardOutputRunner;
- private InputStreamRunner standardErrorRunner;
- public FlumeProcessExecutor(final String... command) throws ProcessExecutionException {
- this.command = new ArrayList<String>(command.length);
- for (final String arg : command) {
- this.command.add(arg);
- }
- }
- public FlumeProcessExecutor(final List<String> command) throws ProcessExecutionException {
- this(command, null, null);
- }
- public FlumeProcessExecutor(final List<String> command,
- final InputStreamRunner standardErrorRunner) throws ProcessExecutionException {
- this(command, standardErrorRunner, null);
- }
- public FlumeProcessExecutor(final List<String> command,
- final InputStreamRunner standardErrorRunner,
- final InputStreamRunner standardOutputRunner) throws ProcessExecutionException {
- if (command == null) {
- throw new ProcessExecutionException("Command can not be null");
- }
- this.command = command;
- this.standardErrorRunner = standardErrorRunner;
- this.standardOutputRunner = standardOutputRunner;
- }
- public void start() throws ProcessExecutionException {
- //System.err.println(command);
- try {
- process = new ProcessBuilder(command).start();
- } catch (final IOException ioe) {
- throw new ProcessExecutionException("Could not build process", ioe);
- }
- //System.err.println(process);
- standardInput = process.getOutputStream();
- //Pipe standard out
- if (standardOutputRunner == null) {
- final PipedOutputStream pipedOutputStream = new PipedOutputStream();
- standardOutput = pipeOutput(pipedOutputStream);
- standardOutputRunner = attachOutput(pipedOutputStream);
- }
- standardOutputRunner.setInputStream(process.getInputStream());
- final Thread standardOutputThread = new Thread(standardOutputRunner, "stdout");
- standardOutputThread.start();
- //Pipe standard err
- if (standardErrorRunner == null) {
- final PipedOutputStream pipedOutputStream = new PipedOutputStream();
- standardError = pipeOutput(pipedOutputStream);
- standardErrorRunner = attachOutput(pipedOutputStream);
- }
- standardErrorRunner.setInputStream(process.getErrorStream());
- final Thread standardErrThread = new Thread(standardErrorRunner, "stderr");
- standardErrThread.start();
- }
- private PipedInputStream pipeOutput(final PipedOutputStream pipedOutputStream) throws ProcessExecutionException {
- final PipedInputStream pipedInputStream;
- try {
- pipedInputStream = new PipedInputStream(pipedOutputStream);
- } catch (final IOException ioe) {
- throw new ProcessExecutionException("Could not pipe streams", ioe);
- }
- return pipedInputStream;
- }
- private InputStreamRunner attachOutput(final PipedOutputStream pipedOutputStream) {
- final InputStreamRunner ioAttachRunner = new IOAttachRunner(pipedOutputStream);
- return ioAttachRunner;
- }
- public int waitFor() throws ProcessExecutionException {
- try {
- return process.waitFor();
- }
- catch(final InterruptedException ie) {
- throw new ProcessExecutionException(ie);
- }
- }
- private class IOAttachRunner implements InputStreamRunner {
- private InputStream inputStream;
- private final PipedOutputStream pipedOutputStream;
- public IOAttachRunner(final PipedOutputStream pipedOutputStream) {
- this.pipedOutputStream = pipedOutputStream;
- }
- public void run() {
- //System.out.println(this);
- try {
- run0();
- } catch (final Exception ex) {
- //log ie
- System.err.println(Thread.currentThread().getName());
- ex.printStackTrace();
- }
- finally {
- System.err.println("Exiting thread... " + Thread.currentThread().getName());
- }
- }
- public void run0() throws InterruptedException, IOException {
- final int bufferSize = 1024;
- byte[] bytes = new byte[bufferSize];
- try {
- int readCount = 0;
- while ( (readCount = inputStream.read(bytes)) > 0 ) {
- System.out.write(bytes,0,readCount);
- pipedOutputStream.write(bytes,0,readCount);
- System.out.println("[" + Thread.currentThread().getName() + "] ReadCount : " + readCount);
- }
- } finally {
- pipedOutputStream.close();
- }
- }
- public void destroy() {
- try {
- destroy0();
- } catch (final Exception ex) {
- //log ex
- ex.printStackTrace();
- }
- }
- private void destroy0() throws IOException {
- try {
- Thread.currentThread().interrupt();
- } finally {
- if (pipedOutputStream == null) {
- pipedOutputStream.close();
- }
- }
- }
- public void setInputStream(final InputStream inputStream) {
- this.inputStream = inputStream;
- }
- }
- public InputStream getStandardOutput() throws ProcessExecutionException {
- if (standardOutput == null && standardOutputRunner == null && process == null) {
- throw new ProcessExecutionException("Process has not started");
- } else if (standardOutput == null && standardOutputRunner != null) {
- throw new ProcessExecutionException("Standard Output off loaded to "
- + standardOutputRunner.getClass().getName());
- } else if (standardOutput == null) {
- throw new ProcessExecutionException("Standard Output is not set");
- }
- return standardOutput;
- }
- public InputStream getStandardError() throws ProcessExecutionException {
- if (standardError == null && standardErrorRunner == null && process == null) {
- throw new ProcessExecutionException("Process has not started");
- } else if (standardError == null && standardErrorRunner != null) {
- throw new ProcessExecutionException("Standard Error off loaded to "
- + standardErrorRunner.getClass().getName());
- } else if (standardError == null) {
- throw new ProcessExecutionException("Standard Error is not set");
- }
- return standardError;
- }
- public OutputStream getStandardInput() throws ProcessExecutionException {
- if (standardInput == null && process == null) {
- throw new ProcessExecutionException("Process has not started");
- } else if (standardInput == null) {
- throw new ProcessExecutionException("Standard Input is not set");
- }
- return standardInput;
- }
- public void stop() throws ProcessExecutionException {
- try {
- stop0();
- } catch (final Exception ex) {
- throw new ProcessExecutionException("Process stop failed", ex);
- }
- }
- private void stop0() {
- try {
- standardErrorRunner.destroy();
- standardOutputRunner.destroy();
- } finally {
- process.destroy();
- }
- }
- public int exitCode() throws ProcessExecutionException {
- try {
- return process.exitValue();
- } catch (final Exception ex) {
- throw new ProcessExecutionException("Can not get exit code", ex);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement