Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- public class ArrayBlockingQueueUsingTwoLockApproach {
- /** The queued items */
- final Object[] items;
- /** items index for next take, poll, peek or remove */
- int takeIndex;
- /** items index for next put, offer, or add */
- int putIndex;
- /** Current number of elements */
- private final AtomicInteger count = new AtomicInteger();
- /** Lock held by take, poll, etc */
- private final ReentrantLock takeLock = new ReentrantLock();
- /** Wait queue for waiting takes */
- private final Condition notEmpty = takeLock.newCondition();
- /** Lock held by put, offer, etc */
- private final ReentrantLock putLock = new ReentrantLock();
- /** Wait queue for waiting puts */
- private final Condition notFull = putLock.newCondition();
- public ArrayBlockingQueueUsingTwoLockApproach(int capacity) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- this.items = new Object[capacity];
- }
- public void put(Object e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- while (count.get() == items.length) {
- notFull.await();
- }
- enqueue(e);
- c = count.getAndIncrement();
- if (c + 1 < items.length)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- }
- public Object take() throws InterruptedException {
- Object x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- while (count.get() == 0) {
- notEmpty.await();
- }
- x = dequeue();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- if (c == items.length)
- signalNotFull();
- return x;
- }
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
- /**
- * Inserts element at current put position, advances, and signals.
- * Call only when holding lock.
- */
- private void enqueue(Object x) {
- // assert lock.getHoldCount() == 1;
- // assert items[putIndex] == null;
- final Object[] items = this.items;
- items[putIndex] = x;
- if (++putIndex == items.length)
- putIndex = 0;
- count.incrementAndGet();
- }
- /**
- * Extracts element at current take position, advances, and signals.
- * Call only when holding lock.
- */
- private Object dequeue() {
- // assert lock.getHoldCount() == 1;
- // assert items[takeIndex] != null;
- final Object[] items = this.items;
- @SuppressWarnings("unchecked")
- Object x = (Object) items[takeIndex];
- items[takeIndex] = null;
- if (++takeIndex == items.length)
- takeIndex = 0;
- count.decrementAndGet();
- return x;
- }
- /**
- * Signals a waiting put. Called only from take/poll.
- */
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement