zysaaa

ArrayBlockingQueueUsingTwoLock and test

Oct 17th, 2021 (edited)
524
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 9.90 KB | None | 0 0
  1.  
  2. import java.util.Collection;
  3. import java.util.Iterator;
  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. import java.util.concurrent.locks.Condition;
  8. import java.util.concurrent.locks.ReentrantLock;
  9.  
  10. public class ArrayBlockingQueueTwoLocks<E> implements BlockingQueue<E> {
  11.  
  12.     final Object[] items;
  13.  
  14.     int takeIndex;
  15.  
  16.     int putIndex;
  17.  
  18.     private final AtomicInteger count = new AtomicInteger();
  19.  
  20.     private final ReentrantLock takeLock = new ReentrantLock();
  21.  
  22.     private final Condition notEmpty = takeLock.newCondition();
  23.  
  24.     private final ReentrantLock putLock = new ReentrantLock();
  25.  
  26.     private final Condition notFull = putLock.newCondition();
  27.  
  28.     public ArrayBlockingQueueTwoLocks(int capacity) {
  29.         if (capacity <= 0)
  30.             throw new IllegalArgumentException();
  31.         this.items = new Object[capacity];
  32.     }
  33.  
  34.     public void put(E e) throws InterruptedException {
  35.         if (e == null) throw new NullPointerException();
  36.         int c;
  37.         final ReentrantLock putLock = this.putLock;
  38.         final AtomicInteger count = this.count;
  39.         putLock.lockInterruptibly();
  40.         try {
  41.             while (count.get() == items.length) {
  42.                 notFull.await();
  43.             }
  44.             enqueue(e);
  45.             c = count.getAndIncrement();
  46.             if (c + 1 < items.length)
  47.                 notFull.signal();
  48.         } finally {
  49.             putLock.unlock();
  50.         }
  51.         if (c == 0)
  52.             signalNotEmpty();
  53.     }
  54.  
  55.     public E take() throws InterruptedException {
  56.         E x;
  57.         int c = -1;
  58.         final AtomicInteger count = this.count;
  59.         final ReentrantLock takeLock = this.takeLock;
  60.         takeLock.lockInterruptibly();
  61.         try {
  62.             while (count.get() == 0) {
  63.                 notEmpty.await();
  64.             }
  65.             x = dequeue();
  66.             c = count.getAndDecrement();
  67.             if (c > 1)
  68.                 notEmpty.signal();
  69.         } finally {
  70.             takeLock.unlock();
  71.         }
  72.         if (c == items.length)
  73.             signalNotFull();
  74.         return x;
  75.     }
  76.  
  77.     @Override
  78.     public boolean isEmpty() {
  79.         return count.get() == 0;
  80.     }
  81.  
  82.     private void signalNotEmpty() {
  83.         final ReentrantLock takeLock = this.takeLock;
  84.         takeLock.lock();
  85.         try {
  86.             notEmpty.signal();
  87.         } finally {
  88.             takeLock.unlock();
  89.         }
  90.     }
  91.  
  92.     private void enqueue(E x) {
  93.         final Object[] items = this.items;
  94.         items[putIndex] = x;
  95.         if (++putIndex == items.length)
  96.             putIndex = 0;
  97.     }
  98.  
  99.     @SuppressWarnings("unchecked")
  100.     private E dequeue() {
  101.         final Object[] items = this.items;
  102.         Object x = items[takeIndex];
  103.         items[takeIndex] = null;
  104.         if (++takeIndex == items.length)
  105.             takeIndex = 0;
  106.         return (E) x;
  107.     }
  108.  
  109.     private void signalNotFull() {
  110.         final ReentrantLock putLock = this.putLock;
  111.         putLock.lock();
  112.         try {
  113.             notFull.signal();
  114.         } finally {
  115.             putLock.unlock();
  116.         }
  117.     }
  118.  
  119.     @Override
  120.     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  121.         return false;
  122.     }
  123.  
  124.     @Override
  125.     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  126.         return null;
  127.     }
  128.  
  129.     @Override
  130.     public int remainingCapacity() {
  131.         return 0;
  132.     }
  133.  
  134.     @Override
  135.     public boolean remove(Object o) {
  136.         return false;
  137.     }
  138.  
  139.     @Override
  140.     public boolean containsAll(Collection<?> c) {
  141.         return false;
  142.     }
  143.  
  144.     @Override
  145.     public boolean addAll(Collection<? extends E> c) {
  146.         return false;
  147.     }
  148.  
  149.     @Override
  150.     public boolean removeAll(Collection<?> c) {
  151.         return false;
  152.     }
  153.  
  154.     @Override
  155.     public boolean retainAll(Collection<?> c) {
  156.         return false;
  157.     }
  158.  
  159.     @Override
  160.     public void clear() {}
  161.  
  162.     @Override
  163.     public boolean contains(Object o) {
  164.         return false;
  165.     }
  166.  
  167.     @Override
  168.     public Iterator<E> iterator() {
  169.         return null;
  170.     }
  171.  
  172.     @Override
  173.     public Object[] toArray() {
  174.         return new Object[0];
  175.     }
  176.  
  177.     @Override
  178.     public <T> T[] toArray(T[] a) {
  179.         return null;
  180.     }
  181.  
  182.     @Override
  183.     public int drainTo(Collection<? super E> c) {
  184.         return 0;
  185.     }
  186.  
  187.     @Override
  188.     public int drainTo(Collection<? super E> c, int maxElements) {
  189.         return 0;
  190.     }
  191.  
  192.     @Override
  193.     public int size() {
  194.         return 0;
  195.     }
  196.  
  197.     @Override
  198.     public boolean add(E e) {
  199.         return false;
  200.     }
  201.  
  202.     @Override
  203.     public boolean offer(E e) {
  204.         return false;
  205.     }
  206.  
  207.     @Override
  208.     public E remove() {
  209.         return null;
  210.     }
  211.  
  212.     @Override
  213.     public E poll() {
  214.         return null;
  215.     }
  216.  
  217.     @Override
  218.     public E element() {
  219.         return null;
  220.     }
  221.  
  222.     @Override
  223.     public E peek() {
  224.         return null;
  225.     }
  226. }
  227.  
  228.  
  229.  
  230. /**
  231.  * @description:
  232.  * @author:zysaaa
  233.  * @date: 2021/10/16 20:33
  234.  */
  235. public class PutTakeTest {
  236.     public static ExecutorService pool = Executors.newCachedThreadPool();
  237.     protected CyclicBarrier barrier;
  238.     protected BlockingQueue<Integer> bb;
  239.     protected final int nTrials, nPairs;
  240.     protected final AtomicInteger putSum = new AtomicInteger(0);
  241.     protected final AtomicInteger takeSum = new AtomicInteger(0);
  242.  
  243.  
  244.     public PutTakeTest(int capacity, int npairs, int ntrials, BlockingQueue<Integer> bb) {
  245. //        this.bb = new LinkedBlockingQueue<Integer>(capacity);
  246.         this.nTrials = ntrials;
  247.         this.nPairs = npairs;
  248.         this.barrier = new CyclicBarrier(npairs * 2 + 1);
  249.         this.bb = bb;
  250.     }
  251.  
  252.     void test() {
  253.         try {
  254.             for (int i = 0; i < nPairs; i++) {
  255.                 pool.execute(new Producer());
  256.                 pool.execute(new Consumer());
  257.             }
  258.             barrier.await(); // wait for all threads to be ready
  259.             barrier.await(); // wait for all threads to finish
  260.             if (putSum.get() != takeSum.get()) {
  261.                 throw new RuntimeException("Result invalid");
  262.             }
  263.         } catch (Exception e) {
  264.             throw new RuntimeException(e);
  265.         }
  266.     }
  267.  
  268.     static int xorShift(int y) {
  269.         y ^= (y << 6);
  270.         y ^= (y >>> 21);
  271.         y ^= (y << 7);
  272.         return y;
  273.     }
  274.  
  275.     class Producer implements Runnable {
  276.         public void run() {
  277.             try {
  278.                 int seed = (this.hashCode() ^ (int) System.nanoTime());
  279.                 int sum = 0;
  280.                 barrier.await();
  281.                 for (int i = nTrials; i > 0; --i) {
  282.                     bb.put(seed);
  283.                     sum += seed;
  284.                     seed = xorShift(seed);
  285.                 }
  286.                 putSum.getAndAdd(sum);
  287.                 barrier.await();
  288.             } catch (Exception e) {
  289.                 throw new RuntimeException(e);
  290.             }
  291.         }
  292.     }
  293.  
  294.     class Consumer implements Runnable {
  295.         public void run() {
  296.             try {
  297.                 barrier.await();
  298.                 int sum = 0;
  299.                 for (int i = nTrials; i > 0; --i) {
  300.                     sum += bb.take();
  301.                 }
  302.                 takeSum.getAndAdd(sum);
  303.                 barrier.await();
  304.             } catch (Exception e) {
  305.                 throw new RuntimeException(e);
  306.             }
  307.         }
  308.     }
  309. }
  310.  
  311. import java.util.concurrent.*;
  312.  
  313. /**
  314.  * @description:
  315.  * @author:zysaaa
  316.  * @date: 2021/10/16 20:17
  317.  */
  318. public class TimedPutTakeTest extends PutTakeTest {
  319.     private BarrierTimer timer = new BarrierTimer();
  320.  
  321.     public TimedPutTakeTest(int cap, int pairs, int trials, BlockingQueue linkedBlockingQueue) {
  322.         super(cap, pairs, trials, linkedBlockingQueue);
  323.         barrier = new CyclicBarrier(nPairs * 2 + 1, timer);
  324.     }
  325.  
  326.     public void test() {
  327.         try {
  328.             timer.clear();
  329.             for (int i = 0; i < nPairs; i++) {
  330.                 pool.execute(new PutTakeTest.Producer());
  331.                 pool.execute(new PutTakeTest.Consumer());
  332.             }
  333.             barrier.await();
  334.             barrier.await();
  335.             if (putSum.get() != takeSum.get()) {
  336.                 throw new RuntimeException("Result invalid");
  337.             }
  338.             long nsPerItem = timer.getTime() / (nPairs * (long) nTrials);
  339.             System.out.print("Throughput: " + nsPerItem + " ns/item");
  340.         } catch (Exception e) {
  341.             throw new RuntimeException(e);
  342.         }
  343.     }
  344.  
  345.     public static void main(String[] args) throws Exception {
  346.         int tpt = 100000; // trials per thread
  347.         for (int cap = 10; cap <= 1000; cap *= 10) {
  348.             System.out.println("Capacity: " + cap);
  349.             for (int pairs = 1; pairs <= 128; pairs *= 2) {
  350.                 TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt, new ArrayBlockingQueue(cap));
  351.                 //TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt, new ArrayBlockingQueueTwoLocks(cap));
  352.                 System.out.print("Pairs: " + pairs + "\t");
  353.                 t.test();
  354.                 System.out.print("\t");
  355.                 Thread.sleep(1000);
  356.                 t.test();
  357.                 System.out.println();
  358.                 Thread.sleep(1000);
  359.             }
  360.         }
  361.         PutTakeTest.pool.shutdown();
  362.  
  363.     }
  364. }
  365.  
  366.  
  367. public class BarrierTimer implements Runnable {
  368.     private boolean started;
  369.     private long startTime, endTime;
  370.  
  371.     public synchronized void run() {
  372.         long t = System.nanoTime();
  373.         if (!started) {
  374.             started = true;
  375.             startTime = t;
  376.         } else
  377.             endTime = t;
  378.     }
  379.  
  380.     public synchronized void clear() {
  381.         started = false;
  382.     }
  383.  
  384.     public synchronized long getTime() {
  385.         return endTime - startTime;
  386.     }
  387. }
  388.  
Add Comment
Please, Sign In to add comment