Advertisement
Guest User

TestExecutor

a guest
Oct 31st, 2012
350
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.02 KB | None | 0 0
  1. // Implemented as a solution to the question in: http://stackoverflow.com/questions/13005964/thread-pool-where-workers-are-both-producers-and-consumers
  2.  
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.RejectedExecutionHandler;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.concurrent.locks.Condition;
  8. import java.util.concurrent.locks.Lock;
  9. import java.util.concurrent.locks.ReentrantLock;
  10.  
  11. public class TestExecutor extends ThreadPoolExecutor {
  12.  
  13.     private long submittedJobCount, completedJobCount;
  14.     private Lock lock;
  15.     private Condition terminationCondition;
  16.     private boolean workTimedOut;
  17.    
  18.     public TestExecutor(int corePoolSize, int maximumPoolSize,
  19.             long keepAliveTime, TimeUnit unit,
  20.             BlockingQueue<Runnable> workQueue,
  21.             RejectedExecutionHandler handler) {
  22.        
  23.         // configure the thread pool executor as normal
  24.         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
  25.  
  26.         // a lock for notifying when completion is detected
  27.         this.lock = new ReentrantLock();
  28.         this.terminationCondition = lock.newCondition();
  29.        
  30.         // set the counters and flags
  31.         reset();
  32.     }
  33.    
  34.     public void reset() {
  35.         // a count of the number of submitted and completed jobs
  36.         this.submittedJobCount = 0;
  37.         this.completedJobCount = 0;
  38.  
  39.         // a flag for recording whether or not work has previously timed out on an awaitCompletion() call
  40.         this.workTimedOut = false;
  41.     }
  42.    
  43.     // returns whether or not this executor has previously timed out on an awaitCompletion() call
  44.     public boolean awaitCompletionTimedOut() {
  45.         return this.workTimedOut;
  46.     }
  47.  
  48.     // executes a new comparable runnable instance
  49.     public <T extends Comparable<T> & Runnable> void executeJob(T comparableWorkUnit) {
  50.  
  51.         // create a new job instance from a comparable work unit
  52.         Job<T> job = new Job<T>(comparableWorkUnit);
  53.        
  54.         // synchronously increment the number of submitted jobs
  55.         incrementSubmittedJobs();
  56.        
  57.         // schedule this job for execution by a pool thread
  58.         super.execute(job);
  59.     }
  60.  
  61.     // waits for the termination condition: all queued jobs completed
  62.     public boolean awaitCompletion(long timeout, TimeUnit unit) {
  63.  
  64.         // record the maximum computation time
  65.         long maxTimeMS = System.currentTimeMillis() + unit.toMillis(timeout);
  66.        
  67.         // a flag to record whether or not the timeout has been exceeded
  68.         this.workTimedOut = System.currentTimeMillis() > maxTimeMS;
  69.        
  70.         // while there are still jobs to perform and the max time hasn't been exceeded
  71.         while(completedJobCount < submittedJobCount && !this.workTimedOut) {
  72.             // lock to await on the termination condition
  73.             lock.lock();
  74.             try {
  75.                 // determine the remaining time to spend awaiting completion
  76.                 long waitTimeMS = maxTimeMS - System.currentTimeMillis();
  77.                 if(waitTimeMS > 0) {
  78.                     System.out.println("Executor now awaiting the termination condition for a maximum of: " + waitTimeMS + "ms.");
  79.                     this.workTimedOut = !terminationCondition.await(waitTimeMS, TimeUnit.MILLISECONDS);    
  80.                 }
  81.                 else {
  82.                     this.workTimedOut = true;
  83.                 }
  84.             } catch (InterruptedException e) {
  85.                 e.printStackTrace();
  86.             }
  87.             finally {
  88.                 lock.unlock();
  89.             }
  90.         }
  91.        
  92.         // return whether or not this method timed out waiting for the termination condition
  93.         return this.workTimedOut;
  94.     }
  95.    
  96.     private synchronized void incrementSubmittedJobs() {
  97.         this.submittedJobCount++;
  98.     }
  99.  
  100.     private synchronized void incrementCompletedJobs() {
  101.         this.completedJobCount++;
  102.     }
  103.  
  104.     // private wrapper class for the runnable, comparable work
  105.     private class Job<T extends Comparable<T> & Runnable> implements Runnable, Comparable<Job<T>> {
  106.        
  107.         private T job;
  108.        
  109.         public Job(T job) {
  110.             this.job = job;
  111.         }
  112.  
  113.         public void run() {
  114.             try {
  115.                 this.job.run();
  116.             }
  117.             finally {
  118.                 // synchronously increment the number of completed jobs
  119.                 incrementCompletedJobs();
  120.  
  121.                 if(completedJobCount >= submittedJobCount) {
  122.                     // lock to signal on the termination condition
  123.                     lock.lock();
  124.                     try {
  125.                         terminationCondition.signalAll();
  126.                     }
  127.                     finally {
  128.                         lock.unlock();
  129.                     }
  130.                 }
  131.             }
  132.         }
  133.        
  134.         public int compareTo(Job<T> o) {
  135.             return this.job.compareTo(o.job);
  136.         }
  137.     }
  138. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement