SHARE
TWEET

Untitled

a guest Oct 13th, 2019 67 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package eu.pacholczyk.fp;
  2.  
  3. import org.apache.commons.lang3.RandomStringUtils;
  4.  
  5. import java.util.List;
  6. import java.util.Random;
  7. import java.util.concurrent.CompletableFuture;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.function.Supplier;
  10. import java.util.stream.IntStream;
  11. import java.util.stream.Stream;
  12.  
  13. import static java.util.concurrent.CompletableFuture.completedFuture;
  14. import static java.util.stream.Collectors.toList;
  15.  
  16. /*
  17.  * Problem: given stream of operations returning CompletableFuture, make sure all the operations are executed
  18.  * sequentially one-by-one - in any order, but with only one operation executing at a time.
  19.  * <br/>
  20.  * First solution (v1) with map+reduce is incorrect as the operation is executed when the next element is requested
  21.  * from the stream, and before the previous had chance to complete
  22.  * <br/>
  23.  * Practical example: calling AWS SDK to create tags for a resource, but due to requests throttling waiting until
  24.  * given tag is created before requesting for the next one.
  25.  */
  26. public class FuturesTest {
  27.     public static void main(String[] args) {
  28.         run_v1(getOperationsStream(20));
  29.         run_v2(getOperationsStream(20));
  30.         run_v3(getOperationsStream(20));
  31.         run_v4(getOperationsStream(20), false);
  32.         run_v4(getOperationsStream(20), true);
  33.     }
  34.  
  35.     static Stream<Operation> getOperationsStream(int length) {
  36.         return IntStream.range(0, length).mapToObj(l -> new Operation(l));
  37.     }
  38.  
  39.     static void run_v1(Stream<Operation> operationsStream) {
  40.         System.out.println("--- START #1 ---");
  41.  
  42.         CompletableFuture<String> all;
  43.  
  44.         all = operationsStream
  45.                 .map(o -> o.perform())
  46.                 .reduce(CompletableFuture.completedFuture(null), (chain, next) -> chain.thenCompose(s -> next));
  47.  
  48.         all.join();
  49.         System.out.println("--- END #1 ---\n");
  50.     }
  51.  
  52.     static void run_v2(Stream<Operation> operationsStream) {
  53.         System.out.println("--- START #2 ---");
  54.  
  55.         List<Operation> operations = operationsStream.collect(toList());
  56.  
  57.         CompletableFuture<String> chain = completedFuture(null);
  58.         for (Operation o : operations) {
  59.             chain = chain.thenCompose(x -> o.perform());
  60.         }
  61.         chain.join();
  62.  
  63.         System.out.println("--- END #2 ---\n");
  64.  
  65.     }
  66.  
  67.     static void run_v3(Stream<Operation> operationsStream) {
  68.         System.out.println("--- START #3 ---");
  69.  
  70.         CompletableFuture<String> all;
  71.  
  72.         all = operationsStream
  73.                 .map(o -> (Supplier<CompletableFuture<String>>) () -> o.perform())
  74.                 .reduce(() -> CompletableFuture.completedFuture(""),
  75.                         (chain, next) -> () -> chain.get().thenCompose(s -> next.get())
  76.                 )
  77.                 .get();
  78.  
  79.         all.join();
  80.  
  81.         System.out.println("--- END #3 ---\n");
  82.     }
  83.  
  84.     static void run_v4(Stream<Operation> operationsStream, boolean parallel) {
  85.         System.out.println("--- START #4 " + (parallel ? "(parallel) " : "") + "---");
  86.  
  87.         CompletableFuture<String> all;
  88.  
  89.         if (parallel) {
  90.             operationsStream = operationsStream.parallel();
  91.         }
  92.  
  93.         all = operationsStream
  94.             .reduce(CompletableFuture.completedFuture(""),
  95.                 (acc, oper) -> acc.thenCompose(s -> {
  96. //                    System.out.println("about to call perform() on operation #" + oper.getN());
  97.                     return oper.perform();
  98.                 }),
  99.                 (cf1, cf2) -> {
  100.                     System.out.println("compose function");
  101.                     return cf1.thenCompose(r -> cf2);
  102.                 }
  103.                 );
  104.  
  105.         all.join();
  106.  
  107.         System.out.println("--- END #4 ---\n");
  108.     }
  109. }
  110.  
  111. class Operation {
  112.     private final int n;
  113.     private Random r = new Random();
  114.  
  115.     Operation(int n) {
  116.         this.n = n;
  117.     }
  118.  
  119.     public int getN() {
  120.         return n;
  121.     }
  122.  
  123.     CompletableFuture<String> perform() {
  124.         return CompletableFuture.supplyAsync(this::doWork);
  125.     }
  126.  
  127.     String doWork() {
  128.         System.out.println("Operation " + n + ": doWork() started.");
  129.         try {
  130.             TimeUnit.MILLISECONDS.sleep(r.nextInt(10) * 100);
  131.         } catch (InterruptedException e) {
  132.             e.printStackTrace();
  133.         }
  134.         String result = RandomStringUtils.randomAlphabetic(10);
  135.         System.out.println(String.format("Operation %d: result: %s", n, result));
  136.         return result;
  137.     }
  138. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top