Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ThreadLocalRandom;
- import java.util.concurrent.TimeUnit;
- import java.util.function.Function;
- import java.util.stream.Stream;
- abstract class Monad<T> {
- static <U> Monad<U> unit(U element) { return null; }
- abstract <U> Monad<U> bind(Function<T, Monad<U>> f);
- }
- class Wrapper<T> extends Monad<T> {
- private T value;
- private Wrapper(T element) { this.value = element; }
- static <U> Monad<U> unit(U element) { return new Wrapper<>(element); }
- public <U> Monad<U> bind(Function<T, Monad<U>> f) { return f.apply(this.value); }
- }
- public class Main {
- private static int result;
- /**
- * This method represents a "query" to a database. It waits for 2 seconds
- * before returning a randomly generated integer that can then be used
- * for further processing
- *
- * @return Randomly generated integer representing a query from a database
- */
- private static CompletableFuture<Integer> queryDB() {
- return CompletableFuture.supplyAsync(() -> {
- System.out.println("Querying DB...");
- try {
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- int queryValue = ThreadLocalRandom.current().nextInt(0, 100);
- System.out.printf("Query complete! Returning %d%n", queryValue);
- return queryValue;
- });
- }
- /**
- * This is a composable function that takes an integer and returns a
- * Future where the integer has been modified.In this case, by
- * multiplying it by 2 and adding 1. Intended to be used in a
- * flatMap or Bind operation
- *
- * @param i Integer to be modified
- * @return Future containing the modified value
- */
- private static CompletableFuture<Integer> modifyInteger(int i) {
- return CompletableFuture.supplyAsync(() -> i * 2 + 1);
- }
- /**
- * This Action will execute a read on the result variable at a future
- * point in time, and is composable in a chain of other monadic futures
- *
- * @return Future of Void representing the Action of reading the value
- * from the result variable at a future point in time
- */
- private static CompletableFuture<Void> readResult() {
- return CompletableFuture.runAsync(() -> System.out.printf("Read value %d from result%n", result));
- }
- /**
- * This Action will execute a write on the result variable at a future
- * point in time, and is composable in a chain of other monadic futures
- *
- * @param i Value to write into the result variable
- * @return Future of Void representing the Action of writing the value
- * passed in to the result variable at a future point in time
- */
- private static CompletableFuture<Void> writeResult(int i) {
- return CompletableFuture.runAsync(() -> {
- result = i;
- System.out.printf("Wrote value %d to result variable %n", i);
- }
- );
- }
- public static void main(String[] args) {
- CompletableFuture<Void> result = Stream.generate(Main::queryDB)
- .limit(5)
- .map(future -> future.thenCompose(Main::modifyInteger))
- // This is an operation on Streams, rather than on Futures. It
- // takes a Stream of Futures of ints, and uses a reduction
- // to take it simply to a Future of an int. In this case, it
- // simply adds them up
- .reduce(CompletableFuture.supplyAsync(() -> 0), (x, y) -> x.thenCombine(y, Integer::sum))
- .thenCompose(Main::writeResult)
- .thenRun(Main::readResult);
- // The blocking wait here is due to the fact that this program will
- // have nothing left to do after running these commands, so without
- // this it would exit before printing the result. If it were a
- // long-running process, like a web-server, or had other work to do,
- // it would be unnecessary
- while (!result.isDone()) {}
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement