Advertisement
Guest User

ABQ

a guest
Jul 2nd, 2014
1,269
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.91 KB | None | 0 0
  1. import java.util.concurrent.atomic.AtomicInteger;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.ReentrantLock;
  4.  
  5. public class ArrayBlockingQueueUsingTwoLockApproach {
  6.    
  7.      /** The queued items */
  8.     final Object[] items;
  9.  
  10.     /** items index for next take, poll, peek or remove */
  11.     int takeIndex;
  12.  
  13.     /** items index for next put, offer, or add */
  14.     int putIndex;
  15.  
  16.     /** Current number of elements */
  17.     private final AtomicInteger count = new AtomicInteger();
  18.  
  19.     /** Lock held by take, poll, etc */
  20.     private final ReentrantLock takeLock = new ReentrantLock();
  21.  
  22.     /** Wait queue for waiting takes */
  23.     private final Condition notEmpty = takeLock.newCondition();
  24.  
  25.     /** Lock held by put, offer, etc */
  26.     private final ReentrantLock putLock = new ReentrantLock();
  27.  
  28.     /** Wait queue for waiting puts */
  29.     private final Condition notFull = putLock.newCondition();
  30.  
  31.     public ArrayBlockingQueueUsingTwoLockApproach(int capacity) {
  32.         if (capacity <= 0)
  33.             throw new IllegalArgumentException();
  34.         this.items = new Object[capacity];
  35.     }
  36.    
  37.     public void put(Object e) throws InterruptedException {
  38.         if (e == null) throw new NullPointerException();
  39.         int c = -1;
  40.         final ReentrantLock putLock = this.putLock;
  41.         final AtomicInteger count = this.count;
  42.         putLock.lockInterruptibly();
  43.         try {
  44.             while (count.get() == items.length) {
  45.                 notFull.await();
  46.             }
  47.             enqueue(e);
  48.             c = count.getAndIncrement();
  49.             if (c + 1 < items.length)
  50.                 notFull.signal();
  51.         } finally {
  52.             putLock.unlock();
  53.         }
  54.         if (c == 0)
  55.             signalNotEmpty();
  56.     }
  57.    
  58.     public Object take() throws InterruptedException {
  59.         Object x;
  60.         int c = -1;
  61.         final AtomicInteger count = this.count;
  62.         final ReentrantLock takeLock = this.takeLock;
  63.         takeLock.lockInterruptibly();
  64.         try {
  65.             while (count.get() == 0) {
  66.                 notEmpty.await();
  67.             }
  68.             x = dequeue();
  69.             c = count.getAndDecrement();
  70.             if (c > 1)
  71.                 notEmpty.signal();
  72.         } finally {
  73.             takeLock.unlock();
  74.         }
  75.         if (c == items.length)
  76.             signalNotFull();
  77.         return x;
  78.     }
  79.    
  80.    
  81.     private void signalNotEmpty() {
  82.         final ReentrantLock takeLock = this.takeLock;
  83.         takeLock.lock();
  84.         try {
  85.             notEmpty.signal();
  86.         } finally {
  87.             takeLock.unlock();
  88.         }
  89.     }
  90.  
  91.    
  92.     /**
  93.      * Inserts element at current put position, advances, and signals.
  94.      * Call only when holding lock.
  95.      */
  96.     private void enqueue(Object x) {
  97.         // assert lock.getHoldCount() == 1;
  98.         // assert items[putIndex] == null;
  99.         final Object[] items = this.items;
  100.         items[putIndex] = x;
  101.         if (++putIndex == items.length)
  102.             putIndex = 0;
  103.         count.incrementAndGet();
  104.     }
  105.  
  106.    
  107.     /**
  108.      * Extracts element at current take position, advances, and signals.
  109.      * Call only when holding lock.
  110.      */
  111.     private Object dequeue() {
  112.         // assert lock.getHoldCount() == 1;
  113.         // assert items[takeIndex] != null;
  114.         final Object[] items = this.items;
  115.         @SuppressWarnings("unchecked")
  116.         Object x = (Object) items[takeIndex];
  117.         items[takeIndex] = null;
  118.         if (++takeIndex == items.length)
  119.             takeIndex = 0;
  120.         count.decrementAndGet();
  121.         return x;
  122.     }
  123.    
  124.     /**
  125.      * Signals a waiting put. Called only from take/poll.
  126.      */
  127.     private void signalNotFull() {
  128.         final ReentrantLock putLock = this.putLock;
  129.         putLock.lock();
  130.         try {
  131.             notFull.signal();
  132.         } finally {
  133.             putLock.unlock();
  134.         }
  135.     }
  136. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement