Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ru.belyaev.multithreading;
- import ru.belyaev.multithreading.queue.IThreadSafeQueue;
- import ru.belyaev.multithreading.queue.ThreadSafePriorQueue;
- import ru.belyaev.multithreading.thread.ReaderThread;
- import ru.belyaev.multithreading.thread.WriterThread;
- public class ThreadRunner {
- private ReaderThread reader;
- private WriterThread writer;
- ThreadRunner(long readerExecutionInterval) {
- final IThreadSafeQueue<Integer> queue = new ThreadSafePriorQueue<Integer>(Integer::compareTo);
- reader = new ReaderThread(queue, readerExecutionInterval);
- writer = new WriterThread(this, queue);
- }
- private void start() {
- reader.start();
- writer.start();
- System.out.println("Program has started");
- }
- public void stop() {
- System.out.println("Stopping program");
- reader.interrupt();
- writer.interrupt();
- }
- public static void main(String[] args) {
- long readerExecutionInterval = 3000;
- new ThreadRunner(readerExecutionInterval).start();
- }
- }
- package ru.belyaev.multithreading.thread;
- import ru.belyaev.multithreading.queue.IThreadSafeQueue;
- /**
- * Created by ABelyaev on 19.07.2017.
- */
- public class ReaderThread extends Thread{
- private final IThreadSafeQueue queue;
- private long executionInterval;
- public ReaderThread(IThreadSafeQueue queue, long executionInterval) {
- this.queue = queue;
- this.executionInterval = executionInterval;
- }
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- Integer x = (Integer)queue.removeMin();
- if (null != x) {
- System.out.println(x + " was removed from queue: " + queue);
- }
- sleep(executionInterval);
- } catch (InterruptedException e) {
- return;
- }
- }
- }
- }
- package ru.belyaev.multithreading.thread;
- import ru.belyaev.multithreading.ThreadRunner;
- import ru.belyaev.multithreading.queue.IThreadSafeQueue;
- import java.util.Scanner;
- /**
- * Created by ABelyaev on 19.07.2017.
- */
- public class WriterThread extends Thread {
- private final IThreadSafeQueue<Integer> queue;
- private ThreadRunner runner;
- private final String CMD_STOP = "stop";
- public WriterThread(ThreadRunner runner, IThreadSafeQueue<Integer> queue) {
- this.runner = runner;
- this.queue = queue;
- }
- @Override
- public void run() {
- while (!interrupted()) {
- Scanner scanner = new Scanner(System.in);
- String readString;
- while (null != (readString = scanner.nextLine()) && !Thread.currentThread().isInterrupted()) {
- if (!readString.isEmpty()) {
- if (CMD_STOP.equalsIgnoreCase(readString)) {
- runner.stop();
- } else {
- try {
- int x = Integer.valueOf(readString);
- System.out.println(x + (queue.add(x) ? "was added" : "was not added"));
- System.out.println("current queue: " + queue);
- } catch (NumberFormatException e) {
- System.out.println("Provided value is not a valid number");
- }
- }
- }
- }
- }
- }
- }
- package ru.belyaev.multithreading.queue;
- import java.util.Comparator;
- import java.util.PriorityQueue;
- /**
- * Created by ABelyaev on 19.07.2017.
- */
- public class ThreadSafePriorQueue<Number> implements IThreadSafeQueue<Number> {
- private final PriorityQueue<Number> queue;
- public ThreadSafePriorQueue(Comparator<? super Number> comparator) {
- queue = new PriorityQueue<>(comparator);
- }
- public boolean add(Number e) {
- synchronized (this) {
- return queue.add(e);
- }
- }
- public Number removeMin() {
- synchronized (this) {
- return queue.poll();
- }
- }
- @Override
- public String toString() {
- return queue.toString();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment