Advertisement
Guest User

Untitled

a guest
Mar 27th, 2017
40
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.75 KB | None | 0 0
  1. import info.kgeorgiy.java.advanced.mapper.ParallelMapper;
  2.  
  3. import java.util.ArrayDeque;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import java.util.Queue;
  7. import java.util.function.Consumer;
  8. import java.util.function.Function;
  9.  
  10. /**
  11. * Created by SuperStrongDinosaur on 26.03.17.
  12. */
  13. public class ParallelMapperImpl implements ParallelMapper {
  14. final private Thread[] threads;
  15. final private Queue<Consumer<Void>> queue;
  16.  
  17. public ParallelMapperImpl(int cnt) {
  18. queue = new ArrayDeque<>();
  19. threads = new Thread[cnt];
  20. for (int i = 0; i < cnt; i++) {
  21. threads[i] = new Thread(() -> {
  22. try {
  23. while (!Thread.currentThread().isInterrupted()) {
  24. Consumer<Void> data = null;
  25. synchronized (queue) {
  26. if (!queue.isEmpty()) {
  27. data = queue.poll();
  28. }
  29. }
  30. if (data != null) {
  31. data.accept(null);
  32. synchronized (queue) {
  33. queue.notifyAll();
  34. }
  35. } else {
  36. synchronized (queue) {
  37. try {
  38. queue.wait();
  39. } catch (InterruptedException e) {
  40. return;
  41. }
  42. }
  43. }
  44. }
  45. } finally {
  46. Thread.currentThread().interrupt();
  47. }
  48. });
  49. threads[i].start();
  50. }
  51. }
  52.  
  53. @Override
  54. public <T, R> List<R> map(Function<? super T, ? extends R> f, List<? extends T> args) throws InterruptedException {
  55. List<R> result = new ArrayList<>();
  56. for (int i = 0; i < args.size(); i++) {
  57. result.add(null);
  58. }
  59. final int[] counter = {0};
  60. for (int i = 0; i < args.size(); i++) {
  61. final int current = i;
  62. synchronized (queue) {
  63. queue.add(aVoid -> {
  64. result.set(current, f.apply(args.get(current)));
  65. synchronized (counter) {
  66. counter[0]++;
  67. }
  68. });
  69.  
  70. }
  71. }
  72. synchronized (queue) {
  73. queue.notifyAll();
  74. while (counter[0] < args.size()) {
  75. queue.wait();
  76. }
  77. }
  78. return result;
  79. }
  80.  
  81. @Override
  82. public void close() throws InterruptedException {
  83. for (Thread thread : threads) {
  84. thread.interrupt();
  85. thread.join();
  86. }
  87. }
  88. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement