SHARE
TWEET

Untitled

a guest Jan 17th, 2019 68 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import lombok.Getter;
  2. import lombok.Setter;
  3. import lombok.ToString;
  4. import lombok.extern.slf4j.Slf4j;
  5.  
  6. import java.time.Duration;
  7. import java.time.LocalDateTime;
  8. import java.util.ArrayList;
  9. import java.util.Collections;
  10. import java.util.List;
  11. import java.util.concurrent.*;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13. import java.util.concurrent.atomic.AtomicLong;
  14. import java.util.function.BiFunction;
  15. import java.util.function.Consumer;
  16. import java.util.function.Function;
  17. import java.util.stream.Collectors;
  18. import java.util.stream.IntStream;
  19.  
  20. /**
  21.  * supply support for concurrent execution <p>
  22.  * todo shutdown/cancel
  23.  *
  24.  * @author xq.h
  25.  * 2018/10/6 18:09
  26.  **/
  27. @Slf4j
  28. @SuppressWarnings("unused")
  29. public class ConcurrentHelper {
  30.  
  31.     public static class ExecuteFuture<S, R> {
  32.         private Future<List<TaskResult<S, R>>> results;
  33.         private LogHelper logHelper;
  34.  
  35.         public ExecuteResult<S, R> awaitFinish() throws ExecutionException, InterruptedException {
  36.             List<TaskResult<S, R>> results = this.results.get();
  37.             return new ExecuteResult<>(results, logHelper);
  38.         }
  39.  
  40.         public ExecuteResult<S, R> awaitFinish(TimeUnit unit, long timeout) throws ExecutionException, InterruptedException, TimeoutException {
  41.             List<TaskResult<S, R>> results = this.results.get(timeout, unit);
  42.             return new ExecuteResult<>(results, logHelper);
  43.         }
  44.  
  45.         public boolean cancel(boolean mayInterruptIfRunning) {
  46.             return results.cancel(mayInterruptIfRunning);
  47.         }
  48.  
  49.         ExecuteFuture(Future<List<TaskResult<S, R>>> results, LogHelper logHelper) {
  50.             this.logHelper = logHelper;
  51.             this.results = results;
  52.         }
  53.     }
  54.  
  55.     @Setter
  56.     @Getter
  57.     @ToString
  58.     public static class TaskResult<T, R> {
  59.         private TaskResult() {
  60.         }
  61.  
  62.         private T task;
  63.         private R result;
  64.         private boolean success;
  65.         private Throwable exception;
  66.     }
  67.  
  68.     public static class ExecuteResult<S, R> {
  69.         private List<TaskResult<S, R>> results;
  70.         private LogHelper logHelper;
  71.  
  72.         public long getTotalCount() {
  73.             return logHelper.getTotal();
  74.         }
  75.  
  76.         public long getSuccessCount() {
  77.             return logHelper.getSuccess();
  78.         }
  79.  
  80.         public long getFailedCount() {
  81.             return logHelper.total - logHelper.success.get();
  82.         }
  83.  
  84.         public Duration getDuration() {
  85.             return logHelper.getCost();
  86.         }
  87.  
  88.         public List<TaskResult<S, R>> getResultSet() {
  89.             return results;
  90.         }
  91.  
  92.  
  93.         ExecuteResult(List<TaskResult<S, R>> results, LogHelper logHelper) {
  94.             this.logHelper = logHelper;
  95.             this.results = results;
  96.         }
  97.  
  98.         public String getLogString() {
  99.             return String.format("Stage finish: cost: %s; total: %d; success: %d", logHelper.getCostString(), logHelper.getTotal(), logHelper.getSuccess());
  100.         }
  101.  
  102.         @Override
  103.         public String toString() {
  104.             StringBuilder stringBuilder = new StringBuilder();
  105.             stringBuilder.append("ExecuteResults:\n");
  106.             results.forEach(result -> stringBuilder.append(result).append("\n"));
  107.             stringBuilder.append("LogHelper:\n").append(logHelper);
  108.  
  109.             return stringBuilder.toString();
  110.         }
  111.     }
  112.  
  113.     /**
  114.      * for logging, such as process, success count, execution time cost
  115.      */
  116.     @ToString
  117.     private static class LogHelper {
  118.         private AtomicInteger process = new AtomicInteger(0);
  119.         private AtomicLong success = new AtomicLong(0L);
  120.         private LocalDateTime startTime = LocalDateTime.now();
  121.         @Getter
  122.         private final int total;
  123.         private boolean retry = false;
  124.         private int currentRetryCount = 0;
  125.         private int retryTotal = 0;
  126.  
  127.         LogHelper(int total) {
  128.             this.total = total;
  129.         }
  130.  
  131.         Duration getCost() {
  132.             return Duration.between(startTime, LocalDateTime.now());
  133.         }
  134.  
  135.         long getSuccess() {
  136.             return success.get();
  137.         }
  138.  
  139.         void plusSuccess() {
  140.             success.incrementAndGet();
  141.         }
  142.  
  143.         String getCostString() {
  144.             return getCost().toString()
  145.                     .substring(2)
  146.                     .replaceAll("(\\d[HMS])(?!$)", "$1 ")
  147.                     .toLowerCase();
  148.         }
  149.  
  150.         private void resetProcess() {
  151.             process.set(0);
  152.         }
  153.  
  154.         void nextRetry(int retryTotal) {
  155.             this.resetProcess();
  156.             this.retry = true;
  157.             this.retryTotal = retryTotal;
  158.             this.currentRetryCount++;
  159.         }
  160.  
  161.         private String printProcess() {
  162.             if (retry) {
  163.                 return String.format("process retry %d: %d/%d", currentRetryCount, process.incrementAndGet(), retryTotal);
  164.             } else {
  165.                 return String.format("process : %d/%d", process.incrementAndGet(), total);
  166.             }
  167.         }
  168.  
  169.     }
  170.  
  171.     /**
  172.      * default {@link #setRetryCount retryCount} is 0 <p>
  173.      * default {@link #setExecutorNum executorNum} is 1 <p>
  174.      * default {@link #setTaskExecutor taskExecutor} will do nothing <p>
  175.      *
  176.      * @param <S> task type
  177.      */
  178.     @Slf4j
  179.     @ToString
  180.     public static class Helper<S, R> {
  181.         private ConcurrentLinkedQueue<S> tasks;
  182.         private ConcurrentLinkedQueue<TaskResult<S, R>> results = new ConcurrentLinkedQueue<>();
  183.         // @formatter:off
  184.         private Function<S,R> taskExecutor = (one) -> null;
  185.         private BiFunction<S,R,Boolean> successJudge = (t,r)-> true;
  186.         // @formatter:on
  187.         private int executorNum = 1;
  188.         private int retryCount = 0;
  189.         private long retryDelay = 0L;
  190.         private LogHelper logHelper;
  191.  
  192.         private boolean concernSuccessResult = false;
  193.         private boolean concernResultData = false;
  194.         private boolean concernFailedException = false;
  195.         private boolean traceRetry = false;
  196.  
  197.         private Helper(ConcurrentLinkedQueue<S> tasks, @SuppressWarnings("unused") Class<R> resultType) {
  198.             this.tasks = tasks;
  199.             this.logHelper = new LogHelper(tasks.size());
  200.         }
  201.  
  202.         private Helper(ConcurrentLinkedQueue<S> tasks, @SuppressWarnings("unused") TypeReference<R> resultTypeReference) {
  203.             this.tasks = tasks;
  204.             this.logHelper = new LogHelper(tasks.size());
  205.         }
  206.  
  207.         /**
  208.          * include success execution result in execution result set
  209.          */
  210.         public Helper<S, R> concernSuccessResult() {
  211.             this.concernSuccessResult = true;
  212.             return this;
  213.         }
  214.  
  215.         /**
  216.          * include execution result data in execution result
  217.          */
  218.         public Helper<S, R> concernResultData() {
  219.             this.concernResultData = true;
  220.             return this;
  221.         }
  222.  
  223.         /**
  224.          * include exception in failed execution result
  225.          */
  226.         public Helper<S, R> concernFailedException() {
  227.             this.concernFailedException = true;
  228.             return this;
  229.         }
  230.  
  231.         /**
  232.          * failed execution will appear at execution result set
  233.          */
  234.         public Helper<S, R> traceRetry() {
  235.             this.traceRetry = true;
  236.             return this;
  237.         }
  238.  
  239.         /**
  240.          * default function is whatever the execution results, think it success as long as execution finish
  241.          *
  242.          * @return true if think it is success
  243.          */
  244.         public Helper<S, R> setSuccessJudge(BiFunction<S, R, Boolean> successJudge) {
  245.             this.successJudge = successJudge;
  246.             return this;
  247.         }
  248.  
  249.         /**
  250.          * default executor will do nothing
  251.          */
  252.         public Helper<S, R> setTaskExecutor(Function<S, R> taskExecutor) {
  253.             this.taskExecutor = taskExecutor;
  254.             return this;
  255.         }
  256.  
  257.         /**
  258.          * default executor will do nothing
  259.          */
  260.         public Helper<S, R> setTaskExecutor(Consumer<S> taskExecutor) {
  261.             this.taskExecutor = (s) -> {
  262.                 taskExecutor.accept(s);
  263.                 return null;
  264.             };
  265.             return this;
  266.         }
  267.  
  268.         /**
  269.          * default number is 1
  270.          */
  271.         public Helper<S, R> setExecutorNum(int executorNum) {
  272.             this.executorNum = executorNum;
  273.             return this;
  274.         }
  275.  
  276.         /**
  277.          * default retry count is 0, will not retry
  278.          */
  279.         public Helper<S, R> setRetryCount(int retryCount) {
  280.             this.retryCount = retryCount;
  281.             return this;
  282.         }
  283.  
  284.         /**
  285.          * default retry delay is 0, will retry immediately
  286.          */
  287.         public Helper<S, R> setRetryDelay(long retryDelay) {
  288.             this.retryDelay = retryDelay;
  289.             return this;
  290.         }
  291.  
  292.         /**
  293.          * execute task asynchronously
  294.          *
  295.          * @return the result which contains failed tasks, execution result
  296.          */
  297.         public ExecuteFuture<S, R> submit() {
  298.             if (tasks == null || tasks.isEmpty()) {
  299.                 return new ExecuteFuture<>(CompletableFuture.completedFuture(new ArrayList<>()), logHelper);
  300.             }
  301.             Future<List<TaskResult<S, R>>> submit = Executors.newSingleThreadExecutor().submit(() -> ConcurrentHelper.executeTask(this));
  302.             return new ExecuteFuture<>(submit, logHelper);
  303.         }
  304.     }
  305.  
  306.  
  307.     /**
  308.      * operator for task,
  309.      * fetch task for queue all the time as long as queue is not empty,
  310.      * transfer task to task executor.
  311.      */
  312.     private static class TaskOperator<S, R> implements Callable<List<TaskResult<S, R>>> {
  313.         private ConcurrentLinkedQueue<S> tasks;
  314.         private LogHelper logHelper;
  315.         private Function<S, R> taskExecutor;
  316.         private BiFunction<S, R, Boolean> successJudge;
  317.         private boolean concernSuccessResult;
  318.         private boolean concernResultData;
  319.         private boolean concernFailedException;
  320.  
  321.         TaskOperator(Helper<S, R> helper) {
  322.             this.tasks = helper.tasks;
  323.             this.logHelper = helper.logHelper;
  324.             this.taskExecutor = helper.taskExecutor;
  325.             this.successJudge = helper.successJudge;
  326.             this.concernSuccessResult = helper.concernSuccessResult;
  327.             this.concernResultData = helper.concernResultData;
  328.             this.concernFailedException = helper.concernFailedException;
  329.         }
  330.  
  331.         @Override
  332.         public List<TaskResult<S, R>> call() {
  333.             List<TaskResult<S, R>> results = new ArrayList<>();
  334.             try {
  335.                 while (!tasks.isEmpty()) {
  336.                     S poll = tasks.poll();
  337.                     TaskResult<S, R> taskResult = new TaskResult<>();
  338.                     taskResult.setTask(poll);
  339.                     try {
  340.                         if (poll != null) {
  341.                             log.debug("{} current:{}", logHelper.printProcess(), poll);
  342.                             R result = taskExecutor.apply(poll);
  343.                             if (concernResultData) {
  344.                                 taskResult.setResult(result);
  345.                             }
  346.                             Boolean success = successJudge.apply(poll, result);
  347.                             taskResult.setSuccess(success);
  348.                             if (success) {
  349.                                 logHelper.plusSuccess();
  350.                                 if (concernSuccessResult) {
  351.                                     results.add(taskResult);
  352.                                 }
  353.                             } else {
  354.                                 results.add(taskResult);
  355.                             }
  356.                         }
  357.                     } catch (Exception e) {
  358.                         taskResult.setSuccess(false);
  359.                         if (concernFailedException) {
  360.                             taskResult.setException(e);
  361.                         }
  362.                         results.add(taskResult);
  363.                         log.error("task execute failed: {}", poll, e);
  364.                     }
  365.                 }
  366.                 return results;
  367.             } catch (Exception e) {
  368.                 log.error("executor error", e);
  369.                 return results;
  370.             }
  371.         }
  372.  
  373.     }
  374.  
  375.     /**
  376.      * generic type carrier
  377.      *
  378.      * @param <T> generic type
  379.      */
  380.     @SuppressWarnings("unused")
  381.     public static abstract class TypeReference<T> {
  382.     }
  383.  
  384.     /**
  385.      * @param tasks      tasks to deal with
  386.      * @param resultType result type
  387.      * @param <S>        task type
  388.      * @param <R>        result type
  389.      * @return return the helper instance
  390.      */
  391.     public static <S, R> Helper<S, R> newInstance(ConcurrentLinkedQueue<S> tasks, Class<R> resultType) {
  392.         return new Helper<>(tasks, resultType);
  393.     }
  394.  
  395.     /**
  396.      * @param tasks               tasks to deal with
  397.      * @param resultTypeReference generic type carrier
  398.      * @param <S>                 task type
  399.      * @param <R>                 task execute result type
  400.      * @return return the helper instance
  401.      */
  402.     public static <S, R> Helper<S, R> newInstance(ConcurrentLinkedQueue<S> tasks, TypeReference<R> resultTypeReference) {
  403.         return new Helper<>(tasks, resultTypeReference);
  404.     }
  405.  
  406.     /**
  407.      * merge multi future into one
  408.      *
  409.      * @param futures futures to merge
  410.      * @param <S>     future task type
  411.      * @param <R>     future return type
  412.      * @return merged future
  413.      */
  414.     private static <S, R> Future<List<TaskResult<S, R>>> mergeFuture(List<Future<List<TaskResult<S, R>>>> futures) {
  415.         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  416.         return singleThreadExecutor.submit(() -> {
  417.             List<TaskResult<S, R>> allResult = new ArrayList<>();
  418.             futures.forEach(future -> {
  419.                 try {
  420.                     allResult.addAll(future.get());
  421.                 } catch (Exception e) {
  422.                     log.error("error occurred while getting future:{}", future, e);
  423.                 }
  424.             });
  425.             return allResult;
  426.         });
  427.     }
  428.  
  429.     /**
  430.      * @param helper the helper, can be the retry one
  431.      * @param <S>    task type
  432.      * @param <R>    task result type
  433.      * @return execute future
  434.      */
  435.     private static <S, R> List<TaskResult<S, R>> executeTask(Helper<S, R> helper) throws ExecutionException, InterruptedException {
  436.         ConcurrentLinkedQueue<S> tasks = helper.tasks;
  437.         boolean traceRetry = helper.traceRetry;
  438.         int executorNum = helper.executorNum;
  439.  
  440.         if (tasks == null || tasks.isEmpty()) {
  441.             return Collections.emptyList();
  442.         }
  443.  
  444.         ExecutorService executorService = Executors.newFixedThreadPool(executorNum);
  445.  
  446.         List<Future<List<TaskResult<S, R>>>> futures;
  447.         futures = IntStream.range(0, executorNum)
  448.                 .mapToObj(i -> executorService.submit(new TaskOperator<>(helper)))
  449.                 .collect(Collectors.toCollection(ArrayList::new));
  450.  
  451.         Future<List<TaskResult<S, R>>> future = ConcurrentHelper.mergeFuture(futures);
  452.  
  453.         List<TaskResult<S, R>> taskResults = future.get(); // wait last execution finish
  454.  
  455.         List<TaskResult<S, R>> failedResults = taskResults.stream()
  456.                 .filter(taskResult -> !taskResult.success)
  457.                 .collect(Collectors.toList());
  458.  
  459.         if (!traceRetry) {
  460.             taskResults.removeAll(failedResults);
  461.         }
  462.  
  463.         // retry
  464.         if (helper.retryCount > 0) {
  465.             ConcurrentLinkedQueue<S> failedTasks = failedResults.stream()
  466.                     .map(TaskResult::getTask)
  467.                     .collect(ConcurrentLinkedQueue::new, ConcurrentLinkedQueue::add, ConcurrentLinkedQueue::addAll);
  468.             helper.tasks = failedTasks; // failedTask put back to the task queue
  469.             log.debug("retrying for {}", failedTasks);
  470.             helper.retryCount--;
  471.             helper.logHelper.nextRetry(failedTasks.size());
  472.             Thread.sleep(helper.retryDelay);
  473.             taskResults.addAll(executeTask(helper));
  474.         } else {
  475.             if (!traceRetry) {
  476.                 taskResults.addAll(failedResults);
  477.             }
  478.         }
  479.  
  480.         return taskResults;
  481.     }
  482. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top