Advertisement
Guest User

Untitled

a guest
Apr 9th, 2020
180
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.00 KB | None | 0 0
  1. package ru.ifmo.rain.naumov.concurrent;
  2.  
  3. import info.kgeorgiy.java.advanced.mapper.ParallelMapper;
  4.  
  5. import java.util.*;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. import java.util.function.Function;
  8.  
  9. public class ParallelMapperImpl implements ParallelMapper {
  10.     private final Queue<Runnable> queue;
  11.     private List<Thread> threads;
  12.  
  13.     public ParallelMapperImpl(int numberOfThreads) {
  14.         this.queue = new ArrayDeque<>();
  15.         this.threads = Collections.nCopies(numberOfThreads, new Thread(() -> {
  16.             try {
  17.                 while (!Thread.interrupted())
  18.                     compute();
  19.             } catch (InterruptedException ignored) {
  20.             }
  21.         }));
  22.         threads.forEach(Thread::start);
  23.     }
  24.  
  25.     private void compute() throws InterruptedException {
  26.         Runnable current;
  27.         synchronized (queue) {
  28.             while (queue.isEmpty()) {
  29.                 queue.wait();
  30.             }
  31.             current = queue.poll();
  32.             queue.notifyAll();
  33.         }
  34.         current.run();
  35.     }
  36.  
  37.     @Override
  38.     public <T, R> List<R> map(Function<? super T, ? extends R> f, List<? extends T> args) throws InterruptedException {
  39.         List<R> result = new ArrayList<>(Collections.nCopies(args.size(), null));
  40.         AtomicInteger size = new AtomicInteger();
  41.         for (int i = 0; i < args.size(); ++i) {
  42.             int finalI = i;
  43.             queue.add(() -> {
  44.                 result.set(finalI, f.apply(args.get(finalI)));
  45.                 synchronized (size) {
  46.                     size.getAndIncrement();
  47.                     size.notifyAll();
  48.                 }
  49.             });
  50.             synchronized (queue) {
  51.                 queue.notifyAll();
  52.             }
  53.  
  54.         }
  55.         synchronized (size) {
  56.             while (size.get() != args.size()) {
  57.                 size.wait();
  58.             }
  59.         }
  60.         return result;
  61.     }
  62.  
  63.     @Override
  64.     public void close() {
  65.         threads.forEach(Thread::interrupt);
  66.     }
  67. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement