Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Collection;
- import java.util.Iterator;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- public class ArrayBlockingQueueTwoLocks<E> implements BlockingQueue<E> {
- final Object[] items;
- int takeIndex;
- int putIndex;
- private final AtomicInteger count = new AtomicInteger();
- private final ReentrantLock takeLock = new ReentrantLock();
- private final Condition notEmpty = takeLock.newCondition();
- private final ReentrantLock putLock = new ReentrantLock();
- private final Condition notFull = putLock.newCondition();
- public ArrayBlockingQueueTwoLocks(int capacity) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- this.items = new Object[capacity];
- }
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- int c;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- while (count.get() == items.length) {
- notFull.await();
- }
- enqueue(e);
- c = count.getAndIncrement();
- if (c + 1 < items.length)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- }
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- while (count.get() == 0) {
- notEmpty.await();
- }
- x = dequeue();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- if (c == items.length)
- signalNotFull();
- return x;
- }
- @Override
- public boolean isEmpty() {
- return count.get() == 0;
- }
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
- private void enqueue(E x) {
- final Object[] items = this.items;
- items[putIndex] = x;
- if (++putIndex == items.length)
- putIndex = 0;
- }
- @SuppressWarnings("unchecked")
- private E dequeue() {
- final Object[] items = this.items;
- Object x = items[takeIndex];
- items[takeIndex] = null;
- if (++takeIndex == items.length)
- takeIndex = 0;
- return (E) x;
- }
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
- @Override
- public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
- return false;
- }
- @Override
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- return null;
- }
- @Override
- public int remainingCapacity() {
- return 0;
- }
- @Override
- public boolean remove(Object o) {
- return false;
- }
- @Override
- public boolean containsAll(Collection<?> c) {
- return false;
- }
- @Override
- public boolean addAll(Collection<? extends E> c) {
- return false;
- }
- @Override
- public boolean removeAll(Collection<?> c) {
- return false;
- }
- @Override
- public boolean retainAll(Collection<?> c) {
- return false;
- }
- @Override
- public void clear() {}
- @Override
- public boolean contains(Object o) {
- return false;
- }
- @Override
- public Iterator<E> iterator() {
- return null;
- }
- @Override
- public Object[] toArray() {
- return new Object[0];
- }
- @Override
- public <T> T[] toArray(T[] a) {
- return null;
- }
- @Override
- public int drainTo(Collection<? super E> c) {
- return 0;
- }
- @Override
- public int drainTo(Collection<? super E> c, int maxElements) {
- return 0;
- }
- @Override
- public int size() {
- return 0;
- }
- @Override
- public boolean add(E e) {
- return false;
- }
- @Override
- public boolean offer(E e) {
- return false;
- }
- @Override
- public E remove() {
- return null;
- }
- @Override
- public E poll() {
- return null;
- }
- @Override
- public E element() {
- return null;
- }
- @Override
- public E peek() {
- return null;
- }
- }
- /**
- * @description:
- * @author:zysaaa
- * @date: 2021/10/16 20:33
- */
- public class PutTakeTest {
- public static ExecutorService pool = Executors.newCachedThreadPool();
- protected CyclicBarrier barrier;
- protected BlockingQueue<Integer> bb;
- protected final int nTrials, nPairs;
- protected final AtomicInteger putSum = new AtomicInteger(0);
- protected final AtomicInteger takeSum = new AtomicInteger(0);
- public PutTakeTest(int capacity, int npairs, int ntrials, BlockingQueue<Integer> bb) {
- // this.bb = new LinkedBlockingQueue<Integer>(capacity);
- this.nTrials = ntrials;
- this.nPairs = npairs;
- this.barrier = new CyclicBarrier(npairs * 2 + 1);
- this.bb = bb;
- }
- void test() {
- try {
- for (int i = 0; i < nPairs; i++) {
- pool.execute(new Producer());
- pool.execute(new Consumer());
- }
- barrier.await(); // wait for all threads to be ready
- barrier.await(); // wait for all threads to finish
- if (putSum.get() != takeSum.get()) {
- throw new RuntimeException("Result invalid");
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- static int xorShift(int y) {
- y ^= (y << 6);
- y ^= (y >>> 21);
- y ^= (y << 7);
- return y;
- }
- class Producer implements Runnable {
- public void run() {
- try {
- int seed = (this.hashCode() ^ (int) System.nanoTime());
- int sum = 0;
- barrier.await();
- for (int i = nTrials; i > 0; --i) {
- bb.put(seed);
- sum += seed;
- seed = xorShift(seed);
- }
- putSum.getAndAdd(sum);
- barrier.await();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- class Consumer implements Runnable {
- public void run() {
- try {
- barrier.await();
- int sum = 0;
- for (int i = nTrials; i > 0; --i) {
- sum += bb.take();
- }
- takeSum.getAndAdd(sum);
- barrier.await();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
- import java.util.concurrent.*;
- /**
- * @description:
- * @author:zysaaa
- * @date: 2021/10/16 20:17
- */
- public class TimedPutTakeTest extends PutTakeTest {
- private BarrierTimer timer = new BarrierTimer();
- public TimedPutTakeTest(int cap, int pairs, int trials, BlockingQueue linkedBlockingQueue) {
- super(cap, pairs, trials, linkedBlockingQueue);
- barrier = new CyclicBarrier(nPairs * 2 + 1, timer);
- }
- public void test() {
- try {
- timer.clear();
- for (int i = 0; i < nPairs; i++) {
- pool.execute(new PutTakeTest.Producer());
- pool.execute(new PutTakeTest.Consumer());
- }
- barrier.await();
- barrier.await();
- if (putSum.get() != takeSum.get()) {
- throw new RuntimeException("Result invalid");
- }
- long nsPerItem = timer.getTime() / (nPairs * (long) nTrials);
- System.out.print("Throughput: " + nsPerItem + " ns/item");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- public static void main(String[] args) throws Exception {
- int tpt = 100000; // trials per thread
- for (int cap = 10; cap <= 1000; cap *= 10) {
- System.out.println("Capacity: " + cap);
- for (int pairs = 1; pairs <= 128; pairs *= 2) {
- TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt, new ArrayBlockingQueue(cap));
- //TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt, new ArrayBlockingQueueTwoLocks(cap));
- System.out.print("Pairs: " + pairs + "\t");
- t.test();
- System.out.print("\t");
- Thread.sleep(1000);
- t.test();
- System.out.println();
- Thread.sleep(1000);
- }
- }
- PutTakeTest.pool.shutdown();
- }
- }
- public class BarrierTimer implements Runnable {
- private boolean started;
- private long startTime, endTime;
- public synchronized void run() {
- long t = System.nanoTime();
- if (!started) {
- started = true;
- startTime = t;
- } else
- endTime = t;
- }
- public synchronized void clear() {
- started = false;
- }
- public synchronized long getTime() {
- return endTime - startTime;
- }
- }
Add Comment
Please, Sign In to add comment