Guest User

Untitled

a guest
Jul 19th, 2018
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.86 KB | None | 0 0
  1. package ok.limits;
  2.  
  3. import java.util.List;
  4. import java.util.Random;
  5. import java.util.concurrent.Callable;
  6. import java.util.concurrent.CompletionService;
  7. import java.util.concurrent.ExecutionException;
  8. import java.util.concurrent.ExecutorCompletionService;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. import java.util.concurrent.ThreadLocalRandom;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13. import java.util.stream.Stream;
  14.  
  15. import org.jetbrains.annotations.NotNull;
  16.  
  17. import one.util.streamex.StreamEx;
  18.  
  19. public class ParallelProducerBatchWriter {
  20.  
  21. private static ThreadLocalRandom random = ThreadLocalRandom.current();
  22.  
  23. public static void main(String[] args) {
  24. ExecutorService executor = Executors.newFixedThreadPool(10);
  25. CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
  26. int requestCount = 100;
  27. int batchSize = 10;
  28.  
  29.  
  30. System.out.println("Отправляем запросы на выполнение");
  31. for (int i = 0; i < requestCount; i++) {
  32. completionService.submit(createRequest(i));
  33. }
  34.  
  35. System.out.println("Собираем результаты запросов в пачки");
  36. AtomicInteger counter = new AtomicInteger(0);
  37. // Пользуем StreamEx для groupRuns
  38. StreamEx.of(streamResults(completionService, requestCount))
  39. .groupRuns((prev, next) -> counter.incrementAndGet() % batchSize != 0)
  40. .forEach(ParallelProducerBatchWriter::writeBatch);
  41.  
  42. executor.shutdown();
  43.  
  44.  
  45. }
  46.  
  47. private static void writeBatch(List<String> result) {
  48. try {
  49. Thread.sleep(random.nextInt(100));
  50. System.out.println(result);
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55.  
  56. @NotNull
  57. private static Callable<String> createRequest(int j) {
  58. return () -> {
  59. int time = random.nextInt(100);
  60. try {
  61. Thread.sleep(time);
  62. System.out.println(j + " finished");
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. return j + ": " + time + "ms";
  67. };
  68. }
  69.  
  70. @NotNull
  71. private static Stream<String> streamResults(CompletionService<String> completionService, int requestCount) {
  72. try {
  73. return Stream.iterate(completionService.take().get(), (prev) ->
  74. {
  75. try {
  76. return completionService.take().get();
  77. } catch (InterruptedException | ExecutionException e) {
  78. e.printStackTrace();
  79. }
  80. return null;
  81. }).limit(requestCount);
  82. } catch (InterruptedException | ExecutionException e) {
  83. e.printStackTrace();
  84. return Stream.of();
  85. }
  86. }
  87.  
  88.  
  89. }
Add Comment
Please, Sign In to add comment