dreamworker

Untitled

Jun 22nd, 2020
258
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.60 KB | None | 0 0
  1. // =========================================================================
  2. 1. Реализовать программу, в которой два потока поочередно пишут ping и pong.
  3. // =========================================================================
  4.  
  5.  
  6.  
  7. package ru.home.geekbrains.threading.data;
  8.  
  9. public enum Message {
  10.     PING("ping"),
  11.     PONG("pong");
  12.  
  13.     private String message;
  14.  
  15.     Message(String message) {
  16.         this.message = message;
  17.     }
  18. }
  19.  
  20.  
  21.  
  22.  
  23. // ===============================================================
  24.  
  25.  
  26. package ru.home.geekbrains.threading.model;
  27.  
  28. import ru.home.geekbrains.threading.data.Message;
  29. import java.util.concurrent.SynchronousQueue;
  30.  
  31. // домашний синглетончик
  32. public enum Storage {
  33.  
  34.     INSTANCE;
  35.  
  36.     SynchronousQueue<Message> queue = new SynchronousQueue<>();
  37.  
  38.     public SynchronousQueue<Message> getQueue() {
  39.         return queue;
  40.     }
  41. }
  42.  
  43.  
  44. // ===============================================================
  45.  
  46.  
  47.  
  48. package ru.home.geekbrains.threading;
  49.  
  50. import java.util.concurrent.locks.ReadWriteLock;
  51. import java.util.concurrent.locks.ReentrantReadWriteLock;
  52.  
  53. public class MyCounter {
  54.  
  55.     private ReadWriteLock lock = new ReentrantReadWriteLock();
  56.  
  57.     private long counter;
  58.  
  59.  
  60.     public long get() {
  61.  
  62.         long result;
  63.  
  64.         try {
  65.             lock.readLock().lock();
  66.             result = counter;
  67.         }
  68.         finally {
  69.             lock.readLock().unlock();
  70.         }
  71.  
  72.         return result;
  73.     }
  74.  
  75.  
  76.     public long incrementAndGet() {
  77.  
  78.         long result;
  79.  
  80.         try {
  81.             lock.writeLock().lock();
  82.             result = ++counter;
  83.         }
  84.         finally {
  85.             lock.writeLock().unlock();
  86.         }
  87.         return result;
  88.     }
  89.  
  90.  
  91.     public long decrementAndGet() {
  92.  
  93.         long result;
  94.  
  95.         try {
  96.             lock.writeLock().lock();
  97.             result = --counter;
  98.         }
  99.         finally {
  100.             lock.writeLock().unlock();
  101.         }
  102.         return result;
  103.     }
  104.  
  105. }
  106.  
  107.  
  108.  
  109. // ===============================================================
  110.  
  111.  
  112.  
  113. package ru.home.geekbrains.threading;
  114.  
  115. import lombok.extern.slf4j.Slf4j;
  116. import ru.home.geekbrains.threading.data.Message;
  117.  
  118. import java.util.concurrent.BlockingQueue;
  119. import java.util.concurrent.TimeUnit;
  120.  
  121. @Slf4j
  122. public abstract class AbstractThread extends Thread {
  123.  
  124.     BlockingQueue<Message> queue;
  125.     MyCounter rx = new MyCounter();
  126.     MyCounter tx = new MyCounter();
  127.  
  128.     public AbstractThread(BlockingQueue<Message> queue) {
  129.         this.queue = queue;
  130.     }
  131.  
  132.  
  133.     @Override
  134.     public void run() {
  135.  
  136.         Message message = null;
  137.         while(!Thread.currentThread().isInterrupted()) {
  138.             try {
  139.                 TimeUnit.MILLISECONDS.sleep(500);
  140.                 communicateLogic();
  141.  
  142.                 log.info("{} - Rx/Tx: {}/{}", Thread.currentThread().getName(), rx.get(), tx.get());
  143.  
  144.             } catch (InterruptedException e) {
  145.                 Thread.currentThread().interrupt();
  146.             }
  147.         }
  148.     }
  149.  
  150.     protected void sendMessage(Message message) throws InterruptedException {
  151.         log.info("{} - send: {}", Thread.currentThread().getName(), message);
  152.         queue.put(message);
  153.         tx.incrementAndGet();
  154.     }
  155.  
  156.     protected Message readMessage() throws InterruptedException {
  157.         Message result = queue.take();
  158.         log.info("{} - received: {}", Thread.currentThread().getName(), result);
  159.         rx.incrementAndGet();
  160.         return result;
  161.     }
  162.  
  163.     protected abstract void communicateLogic() throws InterruptedException;
  164.  
  165.  
  166. }
  167.  
  168.  
  169.  
  170.  
  171.  
  172.  
  173. // ===============================================================
  174.  
  175.  
  176.  
  177.  
  178.  
  179.  
  180. package ru.home.geekbrains.threading;
  181.  
  182. import ru.home.geekbrains.threading.data.Message;
  183.  
  184. import java.util.concurrent.SynchronousQueue;
  185.  
  186. public class MasterThread extends AbstractThread {
  187.  
  188.     public MasterThread(SynchronousQueue<Message> queue) {
  189.         super(queue);
  190.         setName("Master");
  191.     }
  192.  
  193.     @Override
  194.     protected void communicateLogic() throws InterruptedException {
  195.         Message message;
  196.         sendMessage(Message.PING); // send
  197.         message = readMessage();   // receive
  198.     }
  199. }
  200.  
  201.  
  202. // ===============================================================
  203.  
  204.  
  205. package ru.home.geekbrains.threading;
  206.  
  207. import ru.home.geekbrains.threading.data.Message;
  208. import java.util.concurrent.SynchronousQueue;
  209.  
  210. public class SlaveThread extends AbstractThread {
  211.  
  212.     public SlaveThread(SynchronousQueue<Message> queue) {
  213.         super(queue);
  214.         setName("Slave");
  215.     }
  216.  
  217.     @Override
  218.     protected void communicateLogic() throws InterruptedException {
  219.  
  220.         Message message;
  221.         message = readMessage();   // receive
  222.         sendMessage(Message.PONG); // send
  223.     }
  224. }
  225.  
  226.  
  227.  
  228.  
  229.  
  230. // ===============================================================
  231.  
  232.  
  233.  
  234.  
  235. import org.springframework.boot.SpringApplication;
  236. import org.springframework.boot.autoconfigure.SpringBootApplication;
  237.  
  238. @SpringBootApplication
  239. public class ThreadingApplication {
  240.  
  241.     public static void main(String[] args) {
  242.         SpringApplication.run(ThreadingApplication.class, args);
  243.     }
  244.  
  245. }
  246.  
  247.  
  248. // ===============================================================
  249.  
  250.  
  251.  
  252. package ru.home.geekbrains.threading;
  253.  
  254. import org.springframework.boot.ApplicationArguments;
  255. import org.springframework.boot.ApplicationRunner;
  256. import org.springframework.stereotype.Component;
  257. import ru.home.geekbrains.threading.model.Storage;
  258.  
  259. @Component
  260. public class ThreadApplicationRunner implements ApplicationRunner {
  261.  
  262.  
  263.     @Override
  264.     public void run(ApplicationArguments args) throws Exception {
  265.  
  266.         AbstractThread masterThread = new MasterThread(Storage.INSTANCE.getQueue());
  267.         masterThread.start();
  268.  
  269.         AbstractThread slaveThread = new SlaveThread(Storage.INSTANCE.getQueue());
  270.         slaveThread.start();
  271.     }
  272. }
Advertisement
Add Comment
Please, Sign In to add comment