// Implemented as a solution to the question in: http://stackoverflow.com/questions/13005964/thread-pool-where-workers-are-both-producers-and-consumers
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestExecutor extends ThreadPoolExecutor {
private long submittedJobCount, completedJobCount;
private Lock lock;
private Condition terminationCondition;
private boolean workTimedOut;
public TestExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
// configure the thread pool executor as normal
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
// a lock for notifying when completion is detected
this.lock = new ReentrantLock();
this.terminationCondition = lock.newCondition();
// set the counters and flags
reset();
}
public void reset() {
// a count of the number of submitted and completed jobs
this.submittedJobCount = 0;
this.completedJobCount = 0;
// a flag for recording whether or not work has previously timed out on an awaitCompletion() call
this.workTimedOut = false;
}
// returns whether or not this executor has previously timed out on an awaitCompletion() call
public boolean awaitCompletionTimedOut() {
return this.workTimedOut;
}
// executes a new comparable runnable instance
public <T extends Comparable<T> & Runnable> void executeJob(T comparableWorkUnit) {
// create a new job instance from a comparable work unit
Job<T> job = new Job<T>(comparableWorkUnit);
// synchronously increment the number of submitted jobs
incrementSubmittedJobs();
// schedule this job for execution by a pool thread
super.execute(job);
}
// waits for the termination condition: all queued jobs completed
public boolean awaitCompletion(long timeout, TimeUnit unit) {
// record the maximum computation time
long maxTimeMS = System.currentTimeMillis() + unit.toMillis(timeout);
// a flag to record whether or not the timeout has been exceeded
this.workTimedOut = System.currentTimeMillis() > maxTimeMS;
// while there are still jobs to perform and the max time hasn't been exceeded
while(completedJobCount < submittedJobCount && !this.workTimedOut) {
// lock to await on the termination condition
lock.lock();
try {
// determine the remaining time to spend awaiting completion
long waitTimeMS = maxTimeMS - System.currentTimeMillis();
if(waitTimeMS > 0) {
System.out.println("Executor now awaiting the termination condition for a maximum of: " + waitTimeMS + "ms.");
this.workTimedOut = !terminationCondition.await(waitTimeMS, TimeUnit.MILLISECONDS);
}
else {
this.workTimedOut = true;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
// return whether or not this method timed out waiting for the termination condition
return this.workTimedOut;
}
private synchronized void incrementSubmittedJobs() {
this.submittedJobCount++;
}
private synchronized void incrementCompletedJobs() {
this.completedJobCount++;
}
// private wrapper class for the runnable, comparable work
private class Job<T extends Comparable<T> & Runnable> implements Runnable, Comparable<Job<T>> {
private T job;
public Job(T job) {
this.job = job;
}
public void run() {
try {
this.job.run();
}
finally {
// synchronously increment the number of completed jobs
incrementCompletedJobs();
if(completedJobCount >= submittedJobCount) {
// lock to signal on the termination condition
lock.lock();
try {
terminationCondition.signalAll();
}
finally {
lock.unlock();
}
}
}
}
public int compareTo(Job<T> o) {
return this.job.compareTo(o.job);
}
}
}