Guest User

Untitled

a guest
Jan 17th, 2019
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.79 KB | None | 0 0
  1. import lombok.Getter;
  2. import lombok.Setter;
  3. import lombok.ToString;
  4. import lombok.extern.slf4j.Slf4j;
  5.  
  6. import java.time.Duration;
  7. import java.time.LocalDateTime;
  8. import java.util.ArrayList;
  9. import java.util.Collections;
  10. import java.util.List;
  11. import java.util.concurrent.*;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13. import java.util.concurrent.atomic.AtomicLong;
  14. import java.util.function.BiFunction;
  15. import java.util.function.Consumer;
  16. import java.util.function.Function;
  17. import java.util.stream.Collectors;
  18. import java.util.stream.IntStream;
  19.  
  20. /**
  21. * supply support for concurrent execution <p>
  22. * todo shutdown/cancel
  23. *
  24. * @author xq.h
  25. * 2018/10/6 18:09
  26. **/
  27. @Slf4j
  28. @SuppressWarnings("unused")
  29. public class ConcurrentHelper {
  30.  
  31. public static class ExecuteFuture<S, R> {
  32. private Future<List<TaskResult<S, R>>> results;
  33. private LogHelper logHelper;
  34.  
  35. public ExecuteResult<S, R> awaitFinish() throws ExecutionException, InterruptedException {
  36. List<TaskResult<S, R>> results = this.results.get();
  37. return new ExecuteResult<>(results, logHelper);
  38. }
  39.  
  40. public ExecuteResult<S, R> awaitFinish(TimeUnit unit, long timeout) throws ExecutionException, InterruptedException, TimeoutException {
  41. List<TaskResult<S, R>> results = this.results.get(timeout, unit);
  42. return new ExecuteResult<>(results, logHelper);
  43. }
  44.  
  45. public boolean cancel(boolean mayInterruptIfRunning) {
  46. return results.cancel(mayInterruptIfRunning);
  47. }
  48.  
  49. ExecuteFuture(Future<List<TaskResult<S, R>>> results, LogHelper logHelper) {
  50. this.logHelper = logHelper;
  51. this.results = results;
  52. }
  53. }
  54.  
  55. @Setter
  56. @Getter
  57. @ToString
  58. public static class TaskResult<T, R> {
  59. private TaskResult() {
  60. }
  61.  
  62. private T task;
  63. private R result;
  64. private boolean success;
  65. private Throwable exception;
  66. }
  67.  
  68. public static class ExecuteResult<S, R> {
  69. private List<TaskResult<S, R>> results;
  70. private LogHelper logHelper;
  71.  
  72. public long getTotalCount() {
  73. return logHelper.getTotal();
  74. }
  75.  
  76. public long getSuccessCount() {
  77. return logHelper.getSuccess();
  78. }
  79.  
  80. public long getFailedCount() {
  81. return logHelper.total - logHelper.success.get();
  82. }
  83.  
  84. public Duration getDuration() {
  85. return logHelper.getCost();
  86. }
  87.  
  88. public List<TaskResult<S, R>> getResultSet() {
  89. return results;
  90. }
  91.  
  92.  
  93. ExecuteResult(List<TaskResult<S, R>> results, LogHelper logHelper) {
  94. this.logHelper = logHelper;
  95. this.results = results;
  96. }
  97.  
  98. public String getLogString() {
  99. return String.format("Stage finish: cost: %s; total: %d; success: %d", logHelper.getCostString(), logHelper.getTotal(), logHelper.getSuccess());
  100. }
  101.  
  102. @Override
  103. public String toString() {
  104. StringBuilder stringBuilder = new StringBuilder();
  105. stringBuilder.append("ExecuteResults:\n");
  106. results.forEach(result -> stringBuilder.append(result).append("\n"));
  107. stringBuilder.append("LogHelper:\n").append(logHelper);
  108.  
  109. return stringBuilder.toString();
  110. }
  111. }
  112.  
  113. /**
  114. * for logging, such as process, success count, execution time cost
  115. */
  116. @ToString
  117. private static class LogHelper {
  118. private AtomicInteger process = new AtomicInteger(0);
  119. private AtomicLong success = new AtomicLong(0L);
  120. private LocalDateTime startTime = LocalDateTime.now();
  121. @Getter
  122. private final int total;
  123. private boolean retry = false;
  124. private int currentRetryCount = 0;
  125. private int retryTotal = 0;
  126.  
  127. LogHelper(int total) {
  128. this.total = total;
  129. }
  130.  
  131. Duration getCost() {
  132. return Duration.between(startTime, LocalDateTime.now());
  133. }
  134.  
  135. long getSuccess() {
  136. return success.get();
  137. }
  138.  
  139. void plusSuccess() {
  140. success.incrementAndGet();
  141. }
  142.  
  143. String getCostString() {
  144. return getCost().toString()
  145. .substring(2)
  146. .replaceAll("(\\d[HMS])(?!$)", "$1 ")
  147. .toLowerCase();
  148. }
  149.  
  150. private void resetProcess() {
  151. process.set(0);
  152. }
  153.  
  154. void nextRetry(int retryTotal) {
  155. this.resetProcess();
  156. this.retry = true;
  157. this.retryTotal = retryTotal;
  158. this.currentRetryCount++;
  159. }
  160.  
  161. private String printProcess() {
  162. if (retry) {
  163. return String.format("process retry %d: %d/%d", currentRetryCount, process.incrementAndGet(), retryTotal);
  164. } else {
  165. return String.format("process : %d/%d", process.incrementAndGet(), total);
  166. }
  167. }
  168.  
  169. }
  170.  
  171. /**
  172. * default {@link #setRetryCount retryCount} is 0 <p>
  173. * default {@link #setExecutorNum executorNum} is 1 <p>
  174. * default {@link #setTaskExecutor taskExecutor} will do nothing <p>
  175. *
  176. * @param <S> task type
  177. */
  178. @Slf4j
  179. @ToString
  180. public static class Helper<S, R> {
  181. private ConcurrentLinkedQueue<S> tasks;
  182. private ConcurrentLinkedQueue<TaskResult<S, R>> results = new ConcurrentLinkedQueue<>();
  183. // @formatter:off
  184. private Function<S,R> taskExecutor = (one) -> null;
  185. private BiFunction<S,R,Boolean> successJudge = (t,r)-> true;
  186. // @formatter:on
  187. private int executorNum = 1;
  188. private int retryCount = 0;
  189. private long retryDelay = 0L;
  190. private LogHelper logHelper;
  191.  
  192. private boolean concernSuccessResult = false;
  193. private boolean concernResultData = false;
  194. private boolean concernFailedException = false;
  195. private boolean traceRetry = false;
  196.  
  197. private Helper(ConcurrentLinkedQueue<S> tasks, @SuppressWarnings("unused") Class<R> resultType) {
  198. this.tasks = tasks;
  199. this.logHelper = new LogHelper(tasks.size());
  200. }
  201.  
  202. private Helper(ConcurrentLinkedQueue<S> tasks, @SuppressWarnings("unused") TypeReference<R> resultTypeReference) {
  203. this.tasks = tasks;
  204. this.logHelper = new LogHelper(tasks.size());
  205. }
  206.  
  207. /**
  208. * include success execution result in execution result set
  209. */
  210. public Helper<S, R> concernSuccessResult() {
  211. this.concernSuccessResult = true;
  212. return this;
  213. }
  214.  
  215. /**
  216. * include execution result data in execution result
  217. */
  218. public Helper<S, R> concernResultData() {
  219. this.concernResultData = true;
  220. return this;
  221. }
  222.  
  223. /**
  224. * include exception in failed execution result
  225. */
  226. public Helper<S, R> concernFailedException() {
  227. this.concernFailedException = true;
  228. return this;
  229. }
  230.  
  231. /**
  232. * failed execution will appear at execution result set
  233. */
  234. public Helper<S, R> traceRetry() {
  235. this.traceRetry = true;
  236. return this;
  237. }
  238.  
  239. /**
  240. * default function is whatever the execution results, think it success as long as execution finish
  241. *
  242. * @return true if think it is success
  243. */
  244. public Helper<S, R> setSuccessJudge(BiFunction<S, R, Boolean> successJudge) {
  245. this.successJudge = successJudge;
  246. return this;
  247. }
  248.  
  249. /**
  250. * default executor will do nothing
  251. */
  252. public Helper<S, R> setTaskExecutor(Function<S, R> taskExecutor) {
  253. this.taskExecutor = taskExecutor;
  254. return this;
  255. }
  256.  
  257. /**
  258. * default executor will do nothing
  259. */
  260. public Helper<S, R> setTaskExecutor(Consumer<S> taskExecutor) {
  261. this.taskExecutor = (s) -> {
  262. taskExecutor.accept(s);
  263. return null;
  264. };
  265. return this;
  266. }
  267.  
  268. /**
  269. * default number is 1
  270. */
  271. public Helper<S, R> setExecutorNum(int executorNum) {
  272. this.executorNum = executorNum;
  273. return this;
  274. }
  275.  
  276. /**
  277. * default retry count is 0, will not retry
  278. */
  279. public Helper<S, R> setRetryCount(int retryCount) {
  280. this.retryCount = retryCount;
  281. return this;
  282. }
  283.  
  284. /**
  285. * default retry delay is 0, will retry immediately
  286. */
  287. public Helper<S, R> setRetryDelay(long retryDelay) {
  288. this.retryDelay = retryDelay;
  289. return this;
  290. }
  291.  
  292. /**
  293. * execute task asynchronously
  294. *
  295. * @return the result which contains failed tasks, execution result
  296. */
  297. public ExecuteFuture<S, R> submit() {
  298. if (tasks == null || tasks.isEmpty()) {
  299. return new ExecuteFuture<>(CompletableFuture.completedFuture(new ArrayList<>()), logHelper);
  300. }
  301. Future<List<TaskResult<S, R>>> submit = Executors.newSingleThreadExecutor().submit(() -> ConcurrentHelper.executeTask(this));
  302. return new ExecuteFuture<>(submit, logHelper);
  303. }
  304. }
  305.  
  306.  
  307. /**
  308. * operator for task,
  309. * fetch task for queue all the time as long as queue is not empty,
  310. * transfer task to task executor.
  311. */
  312. private static class TaskOperator<S, R> implements Callable<List<TaskResult<S, R>>> {
  313. private ConcurrentLinkedQueue<S> tasks;
  314. private LogHelper logHelper;
  315. private Function<S, R> taskExecutor;
  316. private BiFunction<S, R, Boolean> successJudge;
  317. private boolean concernSuccessResult;
  318. private boolean concernResultData;
  319. private boolean concernFailedException;
  320.  
  321. TaskOperator(Helper<S, R> helper) {
  322. this.tasks = helper.tasks;
  323. this.logHelper = helper.logHelper;
  324. this.taskExecutor = helper.taskExecutor;
  325. this.successJudge = helper.successJudge;
  326. this.concernSuccessResult = helper.concernSuccessResult;
  327. this.concernResultData = helper.concernResultData;
  328. this.concernFailedException = helper.concernFailedException;
  329. }
  330.  
  331. @Override
  332. public List<TaskResult<S, R>> call() {
  333. List<TaskResult<S, R>> results = new ArrayList<>();
  334. try {
  335. while (!tasks.isEmpty()) {
  336. S poll = tasks.poll();
  337. TaskResult<S, R> taskResult = new TaskResult<>();
  338. taskResult.setTask(poll);
  339. try {
  340. if (poll != null) {
  341. log.debug("{} current:{}", logHelper.printProcess(), poll);
  342. R result = taskExecutor.apply(poll);
  343. if (concernResultData) {
  344. taskResult.setResult(result);
  345. }
  346. Boolean success = successJudge.apply(poll, result);
  347. taskResult.setSuccess(success);
  348. if (success) {
  349. logHelper.plusSuccess();
  350. if (concernSuccessResult) {
  351. results.add(taskResult);
  352. }
  353. } else {
  354. results.add(taskResult);
  355. }
  356. }
  357. } catch (Exception e) {
  358. taskResult.setSuccess(false);
  359. if (concernFailedException) {
  360. taskResult.setException(e);
  361. }
  362. results.add(taskResult);
  363. log.error("task execute failed: {}", poll, e);
  364. }
  365. }
  366. return results;
  367. } catch (Exception e) {
  368. log.error("executor error", e);
  369. return results;
  370. }
  371. }
  372.  
  373. }
  374.  
  375. /**
  376. * generic type carrier
  377. *
  378. * @param <T> generic type
  379. */
  380. @SuppressWarnings("unused")
  381. public static abstract class TypeReference<T> {
  382. }
  383.  
  384. /**
  385. * @param tasks tasks to deal with
  386. * @param resultType result type
  387. * @param <S> task type
  388. * @param <R> result type
  389. * @return return the helper instance
  390. */
  391. public static <S, R> Helper<S, R> newInstance(ConcurrentLinkedQueue<S> tasks, Class<R> resultType) {
  392. return new Helper<>(tasks, resultType);
  393. }
  394.  
  395. /**
  396. * @param tasks tasks to deal with
  397. * @param resultTypeReference generic type carrier
  398. * @param <S> task type
  399. * @param <R> task execute result type
  400. * @return return the helper instance
  401. */
  402. public static <S, R> Helper<S, R> newInstance(ConcurrentLinkedQueue<S> tasks, TypeReference<R> resultTypeReference) {
  403. return new Helper<>(tasks, resultTypeReference);
  404. }
  405.  
  406. /**
  407. * merge multi future into one
  408. *
  409. * @param futures futures to merge
  410. * @param <S> future task type
  411. * @param <R> future return type
  412. * @return merged future
  413. */
  414. private static <S, R> Future<List<TaskResult<S, R>>> mergeFuture(List<Future<List<TaskResult<S, R>>>> futures) {
  415. ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  416. return singleThreadExecutor.submit(() -> {
  417. List<TaskResult<S, R>> allResult = new ArrayList<>();
  418. futures.forEach(future -> {
  419. try {
  420. allResult.addAll(future.get());
  421. } catch (Exception e) {
  422. log.error("error occurred while getting future:{}", future, e);
  423. }
  424. });
  425. return allResult;
  426. });
  427. }
  428.  
  429. /**
  430. * @param helper the helper, can be the retry one
  431. * @param <S> task type
  432. * @param <R> task result type
  433. * @return execute future
  434. */
  435. private static <S, R> List<TaskResult<S, R>> executeTask(Helper<S, R> helper) throws ExecutionException, InterruptedException {
  436. ConcurrentLinkedQueue<S> tasks = helper.tasks;
  437. boolean traceRetry = helper.traceRetry;
  438. int executorNum = helper.executorNum;
  439.  
  440. if (tasks == null || tasks.isEmpty()) {
  441. return Collections.emptyList();
  442. }
  443.  
  444. ExecutorService executorService = Executors.newFixedThreadPool(executorNum);
  445.  
  446. List<Future<List<TaskResult<S, R>>>> futures;
  447. futures = IntStream.range(0, executorNum)
  448. .mapToObj(i -> executorService.submit(new TaskOperator<>(helper)))
  449. .collect(Collectors.toCollection(ArrayList::new));
  450.  
  451. Future<List<TaskResult<S, R>>> future = ConcurrentHelper.mergeFuture(futures);
  452.  
  453. List<TaskResult<S, R>> taskResults = future.get(); // wait last execution finish
  454.  
  455. List<TaskResult<S, R>> failedResults = taskResults.stream()
  456. .filter(taskResult -> !taskResult.success)
  457. .collect(Collectors.toList());
  458.  
  459. if (!traceRetry) {
  460. taskResults.removeAll(failedResults);
  461. }
  462.  
  463. // retry
  464. if (helper.retryCount > 0) {
  465. ConcurrentLinkedQueue<S> failedTasks = failedResults.stream()
  466. .map(TaskResult::getTask)
  467. .collect(ConcurrentLinkedQueue::new, ConcurrentLinkedQueue::add, ConcurrentLinkedQueue::addAll);
  468. helper.tasks = failedTasks; // failedTask put back to the task queue
  469. log.debug("retrying for {}", failedTasks);
  470. helper.retryCount--;
  471. helper.logHelper.nextRetry(failedTasks.size());
  472. Thread.sleep(helper.retryDelay);
  473. taskResults.addAll(executeTask(helper));
  474. } else {
  475. if (!traceRetry) {
  476. taskResults.addAll(failedResults);
  477. }
  478. }
  479.  
  480. return taskResults;
  481. }
  482. }
Add Comment
Please, Sign In to add comment