Guest User

Untitled

a guest
Oct 13th, 2019
68
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package eu.pacholczyk.fp;
  2.  
  3. import org.apache.commons.lang3.RandomStringUtils;
  4.  
  5. import java.util.List;
  6. import java.util.Random;
  7. import java.util.concurrent.CompletableFuture;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.function.Supplier;
  10. import java.util.stream.IntStream;
  11. import java.util.stream.Stream;
  12.  
  13. import static java.util.concurrent.CompletableFuture.completedFuture;
  14. import static java.util.stream.Collectors.toList;
  15.  
  16. /*
  17. * Problem: given stream of operations returning CompletableFuture, make sure all the operations are executed
  18. * sequentially one-by-one - in any order, but with only one operation executing at a time.
  19. * <br/>
  20. * First solution (v1) with map+reduce is incorrect as the operation is executed when the next element is requested
  21. * from the stream, and before the previous had chance to complete
  22. * <br/>
  23. * Practical example: calling AWS SDK to create tags for a resource, but due to requests throttling waiting until
  24. * given tag is created before requesting for the next one.
  25. */
  26. public class FuturesTest {
  27. public static void main(String[] args) {
  28. run_v1(getOperationsStream(20));
  29. run_v2(getOperationsStream(20));
  30. run_v3(getOperationsStream(20));
  31. run_v4(getOperationsStream(20), false);
  32. run_v4(getOperationsStream(20), true);
  33. }
  34.  
  35. static Stream<Operation> getOperationsStream(int length) {
  36. return IntStream.range(0, length).mapToObj(l -> new Operation(l));
  37. }
  38.  
  39. static void run_v1(Stream<Operation> operationsStream) {
  40. System.out.println("--- START #1 ---");
  41.  
  42. CompletableFuture<String> all;
  43.  
  44. all = operationsStream
  45. .map(o -> o.perform())
  46. .reduce(CompletableFuture.completedFuture(null), (chain, next) -> chain.thenCompose(s -> next));
  47.  
  48. all.join();
  49. System.out.println("--- END #1 ---\n");
  50. }
  51.  
  52. static void run_v2(Stream<Operation> operationsStream) {
  53. System.out.println("--- START #2 ---");
  54.  
  55. List<Operation> operations = operationsStream.collect(toList());
  56.  
  57. CompletableFuture<String> chain = completedFuture(null);
  58. for (Operation o : operations) {
  59. chain = chain.thenCompose(x -> o.perform());
  60. }
  61. chain.join();
  62.  
  63. System.out.println("--- END #2 ---\n");
  64.  
  65. }
  66.  
  67. static void run_v3(Stream<Operation> operationsStream) {
  68. System.out.println("--- START #3 ---");
  69.  
  70. CompletableFuture<String> all;
  71.  
  72. all = operationsStream
  73. .map(o -> (Supplier<CompletableFuture<String>>) () -> o.perform())
  74. .reduce(() -> CompletableFuture.completedFuture(""),
  75. (chain, next) -> () -> chain.get().thenCompose(s -> next.get())
  76. )
  77. .get();
  78.  
  79. all.join();
  80.  
  81. System.out.println("--- END #3 ---\n");
  82. }
  83.  
  84. static void run_v4(Stream<Operation> operationsStream, boolean parallel) {
  85. System.out.println("--- START #4 " + (parallel ? "(parallel) " : "") + "---");
  86.  
  87. CompletableFuture<String> all;
  88.  
  89. if (parallel) {
  90. operationsStream = operationsStream.parallel();
  91. }
  92.  
  93. all = operationsStream
  94. .reduce(CompletableFuture.completedFuture(""),
  95. (acc, oper) -> acc.thenCompose(s -> {
  96. // System.out.println("about to call perform() on operation #" + oper.getN());
  97. return oper.perform();
  98. }),
  99. (cf1, cf2) -> {
  100. System.out.println("compose function");
  101. return cf1.thenCompose(r -> cf2);
  102. }
  103. );
  104.  
  105. all.join();
  106.  
  107. System.out.println("--- END #4 ---\n");
  108. }
  109. }
  110.  
  111. class Operation {
  112. private final int n;
  113. private Random r = new Random();
  114.  
  115. Operation(int n) {
  116. this.n = n;
  117. }
  118.  
  119. public int getN() {
  120. return n;
  121. }
  122.  
  123. CompletableFuture<String> perform() {
  124. return CompletableFuture.supplyAsync(this::doWork);
  125. }
  126.  
  127. String doWork() {
  128. System.out.println("Operation " + n + ": doWork() started.");
  129. try {
  130. TimeUnit.MILLISECONDS.sleep(r.nextInt(10) * 100);
  131. } catch (InterruptedException e) {
  132. e.printStackTrace();
  133. }
  134. String result = RandomStringUtils.randomAlphabetic(10);
  135. System.out.println(String.format("Operation %d: result: %s", n, result));
  136. return result;
  137. }
  138. }
RAW Paste Data