Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import lombok.Getter;
- import lombok.Setter;
- import lombok.ToString;
- import lombok.extern.slf4j.Slf4j;
- import java.time.Duration;
- import java.time.LocalDateTime;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.function.BiFunction;
- import java.util.function.Consumer;
- import java.util.function.Function;
- import java.util.stream.Collectors;
- import java.util.stream.IntStream;
- /**
- * supply support for concurrent execution <p>
- * todo shutdown/cancel
- *
- * @author xq.h
- * 2018/10/6 18:09
- **/
- @Slf4j
- @SuppressWarnings("unused")
- public class ConcurrentHelper {
- public static class ExecuteFuture<S, R> {
- private Future<List<TaskResult<S, R>>> results;
- private LogHelper logHelper;
- public ExecuteResult<S, R> awaitFinish() throws ExecutionException, InterruptedException {
- List<TaskResult<S, R>> results = this.results.get();
- return new ExecuteResult<>(results, logHelper);
- }
- public ExecuteResult<S, R> awaitFinish(TimeUnit unit, long timeout) throws ExecutionException, InterruptedException, TimeoutException {
- List<TaskResult<S, R>> results = this.results.get(timeout, unit);
- return new ExecuteResult<>(results, logHelper);
- }
- public boolean cancel(boolean mayInterruptIfRunning) {
- return results.cancel(mayInterruptIfRunning);
- }
- ExecuteFuture(Future<List<TaskResult<S, R>>> results, LogHelper logHelper) {
- this.logHelper = logHelper;
- this.results = results;
- }
- }
- @Setter
- @Getter
- @ToString
- public static class TaskResult<T, R> {
- private TaskResult() {
- }
- private T task;
- private R result;
- private boolean success;
- private Throwable exception;
- }
- public static class ExecuteResult<S, R> {
- private List<TaskResult<S, R>> results;
- private LogHelper logHelper;
- public long getTotalCount() {
- return logHelper.getTotal();
- }
- public long getSuccessCount() {
- return logHelper.getSuccess();
- }
- public long getFailedCount() {
- return logHelper.total - logHelper.success.get();
- }
- public Duration getDuration() {
- return logHelper.getCost();
- }
- public List<TaskResult<S, R>> getResultSet() {
- return results;
- }
- ExecuteResult(List<TaskResult<S, R>> results, LogHelper logHelper) {
- this.logHelper = logHelper;
- this.results = results;
- }
- public String getLogString() {
- return String.format("Stage finish: cost: %s; total: %d; success: %d", logHelper.getCostString(), logHelper.getTotal(), logHelper.getSuccess());
- }
- @Override
- public String toString() {
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("ExecuteResults:\n");
- results.forEach(result -> stringBuilder.append(result).append("\n"));
- stringBuilder.append("LogHelper:\n").append(logHelper);
- return stringBuilder.toString();
- }
- }
- /**
- * for logging, such as process, success count, execution time cost
- */
- @ToString
- private static class LogHelper {
- private AtomicInteger process = new AtomicInteger(0);
- private AtomicLong success = new AtomicLong(0L);
- private LocalDateTime startTime = LocalDateTime.now();
- @Getter
- private final int total;
- private boolean retry = false;
- private int currentRetryCount = 0;
- private int retryTotal = 0;
- LogHelper(int total) {
- this.total = total;
- }
- Duration getCost() {
- return Duration.between(startTime, LocalDateTime.now());
- }
- long getSuccess() {
- return success.get();
- }
- void plusSuccess() {
- success.incrementAndGet();
- }
- String getCostString() {
- return getCost().toString()
- .substring(2)
- .replaceAll("(\\d[HMS])(?!$)", "$1 ")
- .toLowerCase();
- }
- private void resetProcess() {
- process.set(0);
- }
- void nextRetry(int retryTotal) {
- this.resetProcess();
- this.retry = true;
- this.retryTotal = retryTotal;
- this.currentRetryCount++;
- }
- private String printProcess() {
- if (retry) {
- return String.format("process retry %d: %d/%d", currentRetryCount, process.incrementAndGet(), retryTotal);
- } else {
- return String.format("process : %d/%d", process.incrementAndGet(), total);
- }
- }
- }
- /**
- * default {@link #setRetryCount retryCount} is 0 <p>
- * default {@link #setExecutorNum executorNum} is 1 <p>
- * default {@link #setTaskExecutor taskExecutor} will do nothing <p>
- *
- * @param <S> task type
- */
- @Slf4j
- @ToString
- public static class Helper<S, R> {
- private ConcurrentLinkedQueue<S> tasks;
- private ConcurrentLinkedQueue<TaskResult<S, R>> results = new ConcurrentLinkedQueue<>();
- // @formatter:off
- private Function<S,R> taskExecutor = (one) -> null;
- private BiFunction<S,R,Boolean> successJudge = (t,r)-> true;
- // @formatter:on
- private int executorNum = 1;
- private int retryCount = 0;
- private long retryDelay = 0L;
- private LogHelper logHelper;
- private boolean concernSuccessResult = false;
- private boolean concernResultData = false;
- private boolean concernFailedException = false;
- private boolean traceRetry = false;
- private Helper(ConcurrentLinkedQueue<S> tasks, @SuppressWarnings("unused") Class<R> resultType) {
- this.tasks = tasks;
- this.logHelper = new LogHelper(tasks.size());
- }
- private Helper(ConcurrentLinkedQueue<S> tasks, @SuppressWarnings("unused") TypeReference<R> resultTypeReference) {
- this.tasks = tasks;
- this.logHelper = new LogHelper(tasks.size());
- }
- /**
- * include success execution result in execution result set
- */
- public Helper<S, R> concernSuccessResult() {
- this.concernSuccessResult = true;
- return this;
- }
- /**
- * include execution result data in execution result
- */
- public Helper<S, R> concernResultData() {
- this.concernResultData = true;
- return this;
- }
- /**
- * include exception in failed execution result
- */
- public Helper<S, R> concernFailedException() {
- this.concernFailedException = true;
- return this;
- }
- /**
- * failed execution will appear at execution result set
- */
- public Helper<S, R> traceRetry() {
- this.traceRetry = true;
- return this;
- }
- /**
- * default function is whatever the execution results, think it success as long as execution finish
- *
- * @return true if think it is success
- */
- public Helper<S, R> setSuccessJudge(BiFunction<S, R, Boolean> successJudge) {
- this.successJudge = successJudge;
- return this;
- }
- /**
- * default executor will do nothing
- */
- public Helper<S, R> setTaskExecutor(Function<S, R> taskExecutor) {
- this.taskExecutor = taskExecutor;
- return this;
- }
- /**
- * default executor will do nothing
- */
- public Helper<S, R> setTaskExecutor(Consumer<S> taskExecutor) {
- this.taskExecutor = (s) -> {
- taskExecutor.accept(s);
- return null;
- };
- return this;
- }
- /**
- * default number is 1
- */
- public Helper<S, R> setExecutorNum(int executorNum) {
- this.executorNum = executorNum;
- return this;
- }
- /**
- * default retry count is 0, will not retry
- */
- public Helper<S, R> setRetryCount(int retryCount) {
- this.retryCount = retryCount;
- return this;
- }
- /**
- * default retry delay is 0, will retry immediately
- */
- public Helper<S, R> setRetryDelay(long retryDelay) {
- this.retryDelay = retryDelay;
- return this;
- }
- /**
- * execute task asynchronously
- *
- * @return the result which contains failed tasks, execution result
- */
- public ExecuteFuture<S, R> submit() {
- if (tasks == null || tasks.isEmpty()) {
- return new ExecuteFuture<>(CompletableFuture.completedFuture(new ArrayList<>()), logHelper);
- }
- Future<List<TaskResult<S, R>>> submit = Executors.newSingleThreadExecutor().submit(() -> ConcurrentHelper.executeTask(this));
- return new ExecuteFuture<>(submit, logHelper);
- }
- }
- /**
- * operator for task,
- * fetch task for queue all the time as long as queue is not empty,
- * transfer task to task executor.
- */
- private static class TaskOperator<S, R> implements Callable<List<TaskResult<S, R>>> {
- private ConcurrentLinkedQueue<S> tasks;
- private LogHelper logHelper;
- private Function<S, R> taskExecutor;
- private BiFunction<S, R, Boolean> successJudge;
- private boolean concernSuccessResult;
- private boolean concernResultData;
- private boolean concernFailedException;
- TaskOperator(Helper<S, R> helper) {
- this.tasks = helper.tasks;
- this.logHelper = helper.logHelper;
- this.taskExecutor = helper.taskExecutor;
- this.successJudge = helper.successJudge;
- this.concernSuccessResult = helper.concernSuccessResult;
- this.concernResultData = helper.concernResultData;
- this.concernFailedException = helper.concernFailedException;
- }
- @Override
- public List<TaskResult<S, R>> call() {
- List<TaskResult<S, R>> results = new ArrayList<>();
- try {
- while (!tasks.isEmpty()) {
- S poll = tasks.poll();
- TaskResult<S, R> taskResult = new TaskResult<>();
- taskResult.setTask(poll);
- try {
- if (poll != null) {
- log.debug("{} current:{}", logHelper.printProcess(), poll);
- R result = taskExecutor.apply(poll);
- if (concernResultData) {
- taskResult.setResult(result);
- }
- Boolean success = successJudge.apply(poll, result);
- taskResult.setSuccess(success);
- if (success) {
- logHelper.plusSuccess();
- if (concernSuccessResult) {
- results.add(taskResult);
- }
- } else {
- results.add(taskResult);
- }
- }
- } catch (Exception e) {
- taskResult.setSuccess(false);
- if (concernFailedException) {
- taskResult.setException(e);
- }
- results.add(taskResult);
- log.error("task execute failed: {}", poll, e);
- }
- }
- return results;
- } catch (Exception e) {
- log.error("executor error", e);
- return results;
- }
- }
- }
- /**
- * generic type carrier
- *
- * @param <T> generic type
- */
- @SuppressWarnings("unused")
- public static abstract class TypeReference<T> {
- }
- /**
- * @param tasks tasks to deal with
- * @param resultType result type
- * @param <S> task type
- * @param <R> result type
- * @return return the helper instance
- */
- public static <S, R> Helper<S, R> newInstance(ConcurrentLinkedQueue<S> tasks, Class<R> resultType) {
- return new Helper<>(tasks, resultType);
- }
- /**
- * @param tasks tasks to deal with
- * @param resultTypeReference generic type carrier
- * @param <S> task type
- * @param <R> task execute result type
- * @return return the helper instance
- */
- public static <S, R> Helper<S, R> newInstance(ConcurrentLinkedQueue<S> tasks, TypeReference<R> resultTypeReference) {
- return new Helper<>(tasks, resultTypeReference);
- }
- /**
- * merge multi future into one
- *
- * @param futures futures to merge
- * @param <S> future task type
- * @param <R> future return type
- * @return merged future
- */
- private static <S, R> Future<List<TaskResult<S, R>>> mergeFuture(List<Future<List<TaskResult<S, R>>>> futures) {
- ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
- return singleThreadExecutor.submit(() -> {
- List<TaskResult<S, R>> allResult = new ArrayList<>();
- futures.forEach(future -> {
- try {
- allResult.addAll(future.get());
- } catch (Exception e) {
- log.error("error occurred while getting future:{}", future, e);
- }
- });
- return allResult;
- });
- }
- /**
- * @param helper the helper, can be the retry one
- * @param <S> task type
- * @param <R> task result type
- * @return execute future
- */
- private static <S, R> List<TaskResult<S, R>> executeTask(Helper<S, R> helper) throws ExecutionException, InterruptedException {
- ConcurrentLinkedQueue<S> tasks = helper.tasks;
- boolean traceRetry = helper.traceRetry;
- int executorNum = helper.executorNum;
- if (tasks == null || tasks.isEmpty()) {
- return Collections.emptyList();
- }
- ExecutorService executorService = Executors.newFixedThreadPool(executorNum);
- List<Future<List<TaskResult<S, R>>>> futures;
- futures = IntStream.range(0, executorNum)
- .mapToObj(i -> executorService.submit(new TaskOperator<>(helper)))
- .collect(Collectors.toCollection(ArrayList::new));
- Future<List<TaskResult<S, R>>> future = ConcurrentHelper.mergeFuture(futures);
- List<TaskResult<S, R>> taskResults = future.get(); // wait last execution finish
- List<TaskResult<S, R>> failedResults = taskResults.stream()
- .filter(taskResult -> !taskResult.success)
- .collect(Collectors.toList());
- if (!traceRetry) {
- taskResults.removeAll(failedResults);
- }
- // retry
- if (helper.retryCount > 0) {
- ConcurrentLinkedQueue<S> failedTasks = failedResults.stream()
- .map(TaskResult::getTask)
- .collect(ConcurrentLinkedQueue::new, ConcurrentLinkedQueue::add, ConcurrentLinkedQueue::addAll);
- helper.tasks = failedTasks; // failedTask put back to the task queue
- log.debug("retrying for {}", failedTasks);
- helper.retryCount--;
- helper.logHelper.nextRetry(failedTasks.size());
- Thread.sleep(helper.retryDelay);
- taskResults.addAll(executeTask(helper));
- } else {
- if (!traceRetry) {
- taskResults.addAll(failedResults);
- }
- }
- return taskResults;
- }
- }
Add Comment
Please, Sign In to add comment