Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ok.limits;
- import java.util.List;
- import java.util.Random;
- import java.util.concurrent.Callable;
- import java.util.concurrent.CompletionService;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorCompletionService;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadLocalRandom;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.stream.Stream;
- import org.jetbrains.annotations.NotNull;
- import one.util.streamex.StreamEx;
- public class ParallelProducerBatchWriter {
- private static ThreadLocalRandom random = ThreadLocalRandom.current();
- public static void main(String[] args) {
- ExecutorService executor = Executors.newFixedThreadPool(10);
- CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
- int requestCount = 100;
- int batchSize = 10;
- System.out.println("Отправляем запросы на выполнение");
- for (int i = 0; i < requestCount; i++) {
- completionService.submit(createRequest(i));
- }
- System.out.println("Собираем результаты запросов в пачки");
- AtomicInteger counter = new AtomicInteger(0);
- // Пользуем StreamEx для groupRuns
- StreamEx.of(streamResults(completionService, requestCount))
- .groupRuns((prev, next) -> counter.incrementAndGet() % batchSize != 0)
- .forEach(ParallelProducerBatchWriter::writeBatch);
- executor.shutdown();
- }
- private static void writeBatch(List<String> result) {
- try {
- Thread.sleep(random.nextInt(100));
- System.out.println(result);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @NotNull
- private static Callable<String> createRequest(int j) {
- return () -> {
- int time = random.nextInt(100);
- try {
- Thread.sleep(time);
- System.out.println(j + " finished");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return j + ": " + time + "ms";
- };
- }
- @NotNull
- private static Stream<String> streamResults(CompletionService<String> completionService, int requestCount) {
- try {
- return Stream.iterate(completionService.take().get(), (prev) ->
- {
- try {
- return completionService.take().get();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- return null;
- }).limit(requestCount);
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- return Stream.of();
- }
- }
- }
Add Comment
Please, Sign In to add comment