G2A Many GEOs
SHARE
TWEET

HazelcastAsyncFunction

tpeierls Mar 12th, 2012 449 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /*
  2.  * This code is placed in the public domain by its author, Tim Peierls.
  3.  */
  4. package com.hazelcast.util;
  5.  
  6. import com.google.common.base.Function;
  7. import static com.google.common.base.Preconditions.checkArgument;
  8. import static com.google.common.base.Preconditions.checkNotNull;
  9.  
  10. import com.google.common.util.concurrent.AsyncFunction;
  11. import com.google.common.util.concurrent.ForwardingListenableFuture;
  12. import com.google.common.util.concurrent.Futures;
  13. import com.google.common.util.concurrent.ListenableFuture;
  14. import com.google.common.util.concurrent.SettableFuture;
  15.  
  16. import com.hazelcast.core.DistributedTask;
  17. import com.hazelcast.core.ExecutionCallback;
  18. import com.hazelcast.core.Hazelcast;
  19. import com.hazelcast.core.HazelcastInstance;
  20.  
  21. import java.io.Serializable;
  22.  
  23. import java.util.concurrent.Callable;
  24. import java.util.concurrent.Executor;
  25. import java.util.concurrent.ExecutionException;
  26. import java.util.concurrent.Future;
  27.  
  28.  
  29. /**
  30.  * An implementation of {@link AsyncFunction} that is applied in
  31.  * a Hazelcast distributed task, potentially on a different node
  32.  * of the Hazelcast cluster than the one on which {@link #apply}
  33.  * is called.
  34.  */
  35. public class HazelcastAsyncFunction<K, V> implements AsyncFunction<K, V> {
  36.  
  37.     /**
  38.      * Creates a builder of {@link AsyncFunction} instances that
  39.      * will evaluate the given {@link Function} asynchronously.
  40.      */
  41.     public static <K, V> HazelcastAsyncFunction<K, V> from(Function<K, V> function) {
  42.         checkArgument(function instanceof Serializable,
  43.             "function must be serializable");
  44.  
  45.         return new HazelcastAsyncFunction<K, V>(function);
  46.     }
  47.  
  48.     /**
  49.      * Creates a HazelcastAsyncFunction identical to this one but running
  50.      * function applications on the given Hazelcast {@link Executor}.
  51.      *
  52.      * @throws IllegalArgumentException if exec is not a
  53.      * Hazelcast Executor
  54.      */
  55.     public HazelcastAsyncFunction<K, V> onExecutor(Executor exec) {
  56.         checkArgument(supportsDistributedExecution(checkNotNull(exec, "exec")),
  57.             "exec must be a Hazelcast-managed Executor");
  58.         return new HazelcastAsyncFunction<K, V>(function, exec, null, taskKeyFunction);
  59.     }
  60.  
  61.     /**
  62.      * Creates a HazelcastAsyncFunction identical to this one but running
  63.      * function applications on the named Hazelcast {@link Executor}.
  64.      */
  65.     public HazelcastAsyncFunction<K, V> onExecutorNamed(String name) {
  66.         return new HazelcastAsyncFunction<K, V>(function, null, name, taskKeyFunction);
  67.     }
  68.  
  69.     /**
  70.      * Creates a HazelcastAsyncFunction identical to this one but running
  71.      * function applications on the default Hazelcast {@link Executor}.
  72.      * This is the default behavior.
  73.      */
  74.     public HazelcastAsyncFunction<K, V> onDefaultExecutor() {
  75.         return new HazelcastAsyncFunction<K, V>(function, null, null, taskKeyFunction);
  76.     }
  77.  
  78.     /**
  79.      * Creates a HazelcastAsyncFunction identical to this one, but using the
  80.      * supplied task key function to map input values to a
  81.      * task key object that Hazelcast will use to determine
  82.      * which cluster node to run a function application on.
  83.      */
  84.     public HazelcastAsyncFunction<K, V> withTaskKeyFunction(Function<K, Object> taskKeyFunction) {
  85.         return new HazelcastAsyncFunction<K, V>(function, exec, name,
  86.             checkNotNull(taskKeyFunction, "taskKeyFunction"));
  87.     }
  88.  
  89.     /**
  90.      * Creates a HazelcastAsyncFunction identical to this one, but using the
  91.      * input values as task keys.
  92.      */
  93.     public HazelcastAsyncFunction<K, V> withInputsAsTaskKeys() {
  94.         Function<K, Object> identityTaskKeyFunction = new Function<K, Object>() {
  95.             public Object apply(K input) { return input; }
  96.         };
  97.         return new HazelcastAsyncFunction<K, V>(function, exec, name, identityTaskKeyFunction);
  98.     }
  99.  
  100.     /**
  101.      * Creates a HazelcastAsyncFunction identical to this one, but not providing
  102.      * task keys, letting Hazelcast decide which node to perform function
  103.      * applications on. This is the default behavior.
  104.      */
  105.     public HazelcastAsyncFunction<K, V> withNoTaskKeys(Function<K, Object> taskKeyFunction) {
  106.         return new HazelcastAsyncFunction<K, V>(function, exec, name, null);
  107.     }
  108.  
  109.  
  110.     public ListenableFuture<V> apply(K key) {
  111.         if (key instanceof Serializable) {
  112.             CallbackFuture<V> future = new CallbackFuture<V>();
  113.  
  114.             Callable<V> callable = new FunctionApplyingCallable<K, V>(function, key);
  115.             DistributedTask<V> task = taskKeyFunction == null
  116.                 ? new DistributedTask<V>(callable)
  117.                 : new DistributedTask<V>(callable, taskKeyFunction.apply(key))
  118.                 ;
  119.             task.setExecutionCallback(future);
  120.             getExecutor().execute(task);
  121.  
  122.             return future;
  123.         } else {
  124.             // Key isn't serializable, so we apply the function
  125.             // synchronously rather than throw an exception.
  126.             return Futures.immediateFuture(function.apply(key));
  127.         }
  128.     }
  129.  
  130.     static class CallbackFuture<V>
  131.             extends ForwardingListenableFuture<V>
  132.             implements ExecutionCallback<V> {
  133.  
  134.         public void done(Future<V> future) {
  135.             try {
  136.                 delegate.set(future.get());
  137.             } catch (InterruptedException ex) {
  138.                 Thread.currentThread().interrupt();
  139.             } catch (ExecutionException ex) {
  140.                 delegate.setException(ex.getCause());
  141.             } catch (Throwable ex) {
  142.                 delegate.setException(ex);
  143.             }
  144.         }
  145.  
  146.         @Override protected final ListenableFuture<V> delegate() {
  147.             return delegate;
  148.         }
  149.  
  150.         private final SettableFuture<V> delegate =
  151.             SettableFuture.create();
  152.     }
  153.  
  154.  
  155.     static class FunctionApplyingCallable<K, V>
  156.             implements Callable<V>, Serializable {
  157.  
  158.         FunctionApplyingCallable(Function<K, V> function, K input) {
  159.             this.function = function;
  160.             this.input = input;
  161.         }
  162.  
  163.         public V call() {
  164.             return function.apply(input);
  165.         }
  166.  
  167.         private final Function<K, V> function;
  168.         private final K input;
  169.     }
  170.  
  171.  
  172.     HazelcastAsyncFunction(Function<K, V> function) {
  173.         this.function = function;
  174.         this.exec = null;
  175.         this.name = null;
  176.         this.taskKeyFunction = null;
  177.     }
  178.  
  179.     HazelcastAsyncFunction(Function<K, V> function,
  180.                            Executor exec,
  181.                            String name,
  182.                            Function<K, Object> taskKeyFunction) {
  183.  
  184.         this.function = function;
  185.         this.exec = exec;
  186.         this.name = name;
  187.         this.taskKeyFunction = taskKeyFunction;
  188.     }
  189.  
  190.     /**
  191.      * Returns whether the given {@link Executor} supports distributed
  192.      * execution by checking if it was produced by a call to Hazelcast's
  193.      * {@link HazelcastInstance#getExecutorService getExecutorService}.
  194.      */
  195.     private static boolean supportsDistributedExecution(Executor exec) {
  196.         return exec.getClass().getName().matches("^.*hazelcast.*$");
  197.     }
  198.  
  199.     private Executor getExecutor() {
  200.         // Avoid calling Hazelcast methods until we actually
  201.         // use the Executor.
  202.         if (exec == null) {
  203.             if (name == null) {
  204.                 return Hazelcast.getExecutorService();
  205.             } else {
  206.                 return Hazelcast.getExecutorService(name);
  207.             }
  208.         } else {
  209.             return exec;
  210.         }
  211.     }
  212.  
  213.     private final Function<K, V> function;
  214.     private final Executor exec;
  215.     private final String name;
  216.     private final Function<K, Object> taskKeyFunction;
  217. }
RAW Paste Data
Ledger Nano X - The secure hardware wallet
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Top