Guest User

Untitled

a guest
Mar 20th, 2018
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.26 KB | None | 0 0
  1. package com.go2going;
  2.  
  3. import org.junit.Test;
  4.  
  5. import java.util.LinkedList;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.LinkedBlockingQueue;
  8. import java.util.concurrent.locks.Lock;
  9. import java.util.concurrent.locks.ReentrantLock;
  10.  
  11. /**
  12. * @author BlueT
  13. * 2018/3/3 20:26
  14. */
  15. public class ProductAndConsumerTest {
  16.  
  17. @Test
  18. public void test1() throws InterruptedException {
  19. // IBlockingQueue<Integer> waitNotify = new WaitNotify<>(2);
  20. IBlockingQueue<Integer> waitNotify = new Queue<>();
  21. // IBlockingQueue<Integer> waitNotify = new Condition<>(2);
  22.  
  23. CountDownLatch countDownLatch = new CountDownLatch(10);
  24.  
  25. Runnable runnable = () -> {
  26. while (true) {
  27. try {
  28. System.out.println(Thread.currentThread().getName() + ":" + waitNotify.take());
  29. countDownLatch.countDown();
  30. } catch (InterruptedException e) {
  31. System.out.println(e);
  32. }
  33. }
  34.  
  35. };
  36. Thread thread = new Thread(runnable, "t1");
  37. thread.start();
  38. Thread thread1 = new Thread(runnable, "t2");
  39. thread1.start();
  40.  
  41. new Thread(()->{
  42. try {
  43. for (int i = 0; i < 10; i++) {
  44. waitNotify.put(i);
  45. }
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. }).start();
  50.  
  51. countDownLatch.await();
  52. thread.interrupt();thread1.interrupt();
  53. System.out.println("Close");
  54. }
  55. }
  56.  
  57. interface IBlockingQueue<T>{
  58. void put(T data) throws InterruptedException;
  59.  
  60. T take() throws InterruptedException;
  61. }
  62.  
  63. /**
  64. * 第一种使用wait,notifyAll的方式,使用这两个方法前得获得对象的监视器,查看方法描述
  65. * @param <T>
  66. */
  67. class WaitNotify<T> implements IBlockingQueue<T>{
  68.  
  69. private int size;
  70. private final LinkedList<T> linkedList = new LinkedList<>();
  71. private final Object lock = new Object();
  72.  
  73. public WaitNotify(int size) {
  74. this.size = size;
  75. }
  76.  
  77. @Override
  78. public void put(T data) throws InterruptedException {
  79. synchronized (lock) {
  80. while (linkedList.size() > size) {
  81. lock.wait();
  82. }
  83.  
  84. linkedList.addLast(data);
  85. lock.notifyAll();
  86. }
  87. }
  88.  
  89. @Override
  90. public T take() throws InterruptedException {
  91. synchronized (lock) {
  92. while (linkedList.size() <= 0) {
  93. lock.wait();
  94. }
  95.  
  96. T t = linkedList.removeFirst();
  97.  
  98. lock.notifyAll();
  99. return t;
  100. }
  101. }
  102. }
  103.  
  104.  
  105. /**
  106. * lock相当于是synchronize,condition相当于是上面的object
  107. * @param <T>
  108. */
  109. class Condition<T> implements IBlockingQueue<T>{
  110.  
  111. private LinkedList<T> linkedList = new LinkedList<>();
  112. private final Lock lock = new ReentrantLock();
  113.  
  114. private final java.util.concurrent.locks.Condition read = lock.newCondition();
  115. private final java.util.concurrent.locks.Condition write = lock.newCondition();
  116.  
  117. private int size;
  118.  
  119. public Condition(int size){
  120. this.size = size;
  121. }
  122.  
  123.  
  124. @Override
  125. public void put(T data) throws InterruptedException {
  126.  
  127.  
  128. try {
  129. lock.lock();
  130.  
  131. while (linkedList.size() >= size) {
  132. // 将释放锁lock
  133. write.await();
  134. }
  135.  
  136. linkedList.addLast(data);
  137. // 唤醒读线程
  138. read.signalAll();
  139. } finally {
  140. lock.unlock();
  141. }
  142.  
  143.  
  144.  
  145. }
  146.  
  147. @Override
  148. public T take() throws InterruptedException {
  149.  
  150. try {
  151. lock.lock();
  152. while (linkedList.size() == 0) {
  153. // 将释放锁lock
  154. read.await();
  155. }
  156.  
  157. T first = linkedList.removeFirst();
  158. // 唤醒写线程
  159. write.signalAll();
  160.  
  161. return first;
  162. }finally {
  163. lock.unlock();
  164. }
  165. }
  166. }
  167.  
  168.  
  169. class Queue<T> implements IBlockingQueue<T>{
  170.  
  171.  
  172. private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(2);
  173.  
  174.  
  175. @Override
  176. public void put(T data) throws InterruptedException {
  177. queue.put(data);
  178. }
  179.  
  180. @Override
  181. public T take() throws InterruptedException {
  182. return queue.take();
  183. }
  184. }
Add Comment
Please, Sign In to add comment