Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ru.ifmo.rain.naumov.concurrent;
- import info.kgeorgiy.java.advanced.mapper.ParallelMapper;
- import java.util.*;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.function.Function;
- public class ParallelMapperImpl implements ParallelMapper {
- private final Queue<Runnable> queue;
- private List<Thread> threads;
- public ParallelMapperImpl(int numberOfThreads) {
- this.queue = new ArrayDeque<>();
- this.threads = Collections.nCopies(numberOfThreads, new Thread(() -> {
- try {
- while (!Thread.interrupted())
- compute();
- } catch (InterruptedException ignored) {
- }
- }));
- threads.forEach(Thread::start);
- }
- private void compute() throws InterruptedException {
- Runnable current;
- synchronized (queue) {
- while (queue.isEmpty()) {
- queue.wait();
- }
- current = queue.poll();
- queue.notifyAll();
- }
- current.run();
- }
- @Override
- public <T, R> List<R> map(Function<? super T, ? extends R> f, List<? extends T> args) throws InterruptedException {
- List<R> result = new ArrayList<>(Collections.nCopies(args.size(), null));
- AtomicInteger size = new AtomicInteger();
- for (int i = 0; i < args.size(); ++i) {
- int finalI = i;
- queue.add(() -> {
- result.set(finalI, f.apply(args.get(finalI)));
- synchronized (size) {
- size.getAndIncrement();
- size.notifyAll();
- }
- });
- synchronized (queue) {
- queue.notifyAll();
- }
- }
- synchronized (size) {
- while (size.get() != args.size()) {
- size.wait();
- }
- }
- return result;
- }
- @Override
- public void close() {
- threads.forEach(Thread::interrupt);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement