Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package eu.pacholczyk.fp;
- import org.apache.commons.lang3.RandomStringUtils;
- import java.util.List;
- import java.util.Random;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.TimeUnit;
- import java.util.function.Supplier;
- import java.util.stream.IntStream;
- import java.util.stream.Stream;
- import static java.util.concurrent.CompletableFuture.completedFuture;
- import static java.util.stream.Collectors.toList;
- /*
- * Problem: given stream of operations returning CompletableFuture, make sure all the operations are executed
- * sequentially one-by-one - in any order, but with only one operation executing at a time.
- * <br/>
- * First solution (v1) with map+reduce is incorrect as the operation is executed when the next element is requested
- * from the stream, and before the previous had chance to complete
- * <br/>
- * Practical example: calling AWS SDK to create tags for a resource, but due to requests throttling waiting until
- * given tag is created before requesting for the next one.
- */
- public class FuturesTest {
- public static void main(String[] args) {
- run_v1(getOperationsStream(20));
- run_v2(getOperationsStream(20));
- run_v3(getOperationsStream(20));
- run_v4(getOperationsStream(20), false);
- run_v4(getOperationsStream(20), true);
- }
- static Stream<Operation> getOperationsStream(int length) {
- return IntStream.range(0, length).mapToObj(l -> new Operation(l));
- }
- static void run_v1(Stream<Operation> operationsStream) {
- System.out.println("--- START #1 ---");
- CompletableFuture<String> all;
- all = operationsStream
- .map(o -> o.perform())
- .reduce(CompletableFuture.completedFuture(null), (chain, next) -> chain.thenCompose(s -> next));
- all.join();
- System.out.println("--- END #1 ---\n");
- }
- static void run_v2(Stream<Operation> operationsStream) {
- System.out.println("--- START #2 ---");
- List<Operation> operations = operationsStream.collect(toList());
- CompletableFuture<String> chain = completedFuture(null);
- for (Operation o : operations) {
- chain = chain.thenCompose(x -> o.perform());
- }
- chain.join();
- System.out.println("--- END #2 ---\n");
- }
- static void run_v3(Stream<Operation> operationsStream) {
- System.out.println("--- START #3 ---");
- CompletableFuture<String> all;
- all = operationsStream
- .map(o -> (Supplier<CompletableFuture<String>>) () -> o.perform())
- .reduce(() -> CompletableFuture.completedFuture(""),
- (chain, next) -> () -> chain.get().thenCompose(s -> next.get())
- )
- .get();
- all.join();
- System.out.println("--- END #3 ---\n");
- }
- static void run_v4(Stream<Operation> operationsStream, boolean parallel) {
- System.out.println("--- START #4 " + (parallel ? "(parallel) " : "") + "---");
- CompletableFuture<String> all;
- if (parallel) {
- operationsStream = operationsStream.parallel();
- }
- all = operationsStream
- .reduce(CompletableFuture.completedFuture(""),
- (acc, oper) -> acc.thenCompose(s -> {
- // System.out.println("about to call perform() on operation #" + oper.getN());
- return oper.perform();
- }),
- (cf1, cf2) -> {
- System.out.println("compose function");
- return cf1.thenCompose(r -> cf2);
- }
- );
- all.join();
- System.out.println("--- END #4 ---\n");
- }
- }
- class Operation {
- private final int n;
- private Random r = new Random();
- Operation(int n) {
- this.n = n;
- }
- public int getN() {
- return n;
- }
- CompletableFuture<String> perform() {
- return CompletableFuture.supplyAsync(this::doWork);
- }
- String doWork() {
- System.out.println("Operation " + n + ": doWork() started.");
- try {
- TimeUnit.MILLISECONDS.sleep(r.nextInt(10) * 100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- String result = RandomStringUtils.randomAlphabetic(10);
- System.out.println(String.format("Operation %d: result: %s", n, result));
- return result;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement