Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Implemented as a solution to the question in: http://stackoverflow.com/questions/13005964/thread-pool-where-workers-are-both-producers-and-consumers
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.RejectedExecutionHandler;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- public class TestExecutor extends ThreadPoolExecutor {
- private long submittedJobCount, completedJobCount;
- private Lock lock;
- private Condition terminationCondition;
- private boolean workTimedOut;
- public TestExecutor(int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- RejectedExecutionHandler handler) {
- // configure the thread pool executor as normal
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
- // a lock for notifying when completion is detected
- this.lock = new ReentrantLock();
- this.terminationCondition = lock.newCondition();
- // set the counters and flags
- reset();
- }
- public void reset() {
- // a count of the number of submitted and completed jobs
- this.submittedJobCount = 0;
- this.completedJobCount = 0;
- // a flag for recording whether or not work has previously timed out on an awaitCompletion() call
- this.workTimedOut = false;
- }
- // returns whether or not this executor has previously timed out on an awaitCompletion() call
- public boolean awaitCompletionTimedOut() {
- return this.workTimedOut;
- }
- // executes a new comparable runnable instance
- public <T extends Comparable<T> & Runnable> void executeJob(T comparableWorkUnit) {
- // create a new job instance from a comparable work unit
- Job<T> job = new Job<T>(comparableWorkUnit);
- // synchronously increment the number of submitted jobs
- incrementSubmittedJobs();
- // schedule this job for execution by a pool thread
- super.execute(job);
- }
- // waits for the termination condition: all queued jobs completed
- public boolean awaitCompletion(long timeout, TimeUnit unit) {
- // record the maximum computation time
- long maxTimeMS = System.currentTimeMillis() + unit.toMillis(timeout);
- // a flag to record whether or not the timeout has been exceeded
- this.workTimedOut = System.currentTimeMillis() > maxTimeMS;
- // while there are still jobs to perform and the max time hasn't been exceeded
- while(completedJobCount < submittedJobCount && !this.workTimedOut) {
- // lock to await on the termination condition
- lock.lock();
- try {
- // determine the remaining time to spend awaiting completion
- long waitTimeMS = maxTimeMS - System.currentTimeMillis();
- if(waitTimeMS > 0) {
- System.out.println("Executor now awaiting the termination condition for a maximum of: " + waitTimeMS + "ms.");
- this.workTimedOut = !terminationCondition.await(waitTimeMS, TimeUnit.MILLISECONDS);
- }
- else {
- this.workTimedOut = true;
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- finally {
- lock.unlock();
- }
- }
- // return whether or not this method timed out waiting for the termination condition
- return this.workTimedOut;
- }
- private synchronized void incrementSubmittedJobs() {
- this.submittedJobCount++;
- }
- private synchronized void incrementCompletedJobs() {
- this.completedJobCount++;
- }
- // private wrapper class for the runnable, comparable work
- private class Job<T extends Comparable<T> & Runnable> implements Runnable, Comparable<Job<T>> {
- private T job;
- public Job(T job) {
- this.job = job;
- }
- public void run() {
- try {
- this.job.run();
- }
- finally {
- // synchronously increment the number of completed jobs
- incrementCompletedJobs();
- if(completedJobCount >= submittedJobCount) {
- // lock to signal on the termination condition
- lock.lock();
- try {
- terminationCondition.signalAll();
- }
- finally {
- lock.unlock();
- }
- }
- }
- }
- public int compareTo(Job<T> o) {
- return this.job.compareTo(o.job);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement