Advertisement
tpeierls

HazelcastAsyncFunction

Mar 12th, 2012
730
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 7.67 KB | None | 0 0
  1. /*
  2.  * This code is placed in the public domain by its author, Tim Peierls.
  3.  */
  4. package com.hazelcast.util;
  5.  
  6. import com.google.common.base.Function;
  7. import static com.google.common.base.Preconditions.checkArgument;
  8. import static com.google.common.base.Preconditions.checkNotNull;
  9.  
  10. import com.google.common.util.concurrent.AsyncFunction;
  11. import com.google.common.util.concurrent.ForwardingListenableFuture;
  12. import com.google.common.util.concurrent.Futures;
  13. import com.google.common.util.concurrent.ListenableFuture;
  14. import com.google.common.util.concurrent.SettableFuture;
  15.  
  16. import com.hazelcast.core.DistributedTask;
  17. import com.hazelcast.core.ExecutionCallback;
  18. import com.hazelcast.core.Hazelcast;
  19. import com.hazelcast.core.HazelcastInstance;
  20.  
  21. import java.io.Serializable;
  22.  
  23. import java.util.concurrent.Callable;
  24. import java.util.concurrent.Executor;
  25. import java.util.concurrent.ExecutionException;
  26. import java.util.concurrent.Future;
  27.  
  28.  
  29. /**
  30.  * An implementation of {@link AsyncFunction} that is applied in
  31.  * a Hazelcast distributed task, potentially on a different node
  32.  * of the Hazelcast cluster than the one on which {@link #apply}
  33.  * is called.
  34.  */
  35. public class HazelcastAsyncFunction<K, V> implements AsyncFunction<K, V> {
  36.  
  37.     /**
  38.      * Creates a builder of {@link AsyncFunction} instances that
  39.      * will evaluate the given {@link Function} asynchronously.
  40.      */
  41.     public static <K, V> HazelcastAsyncFunction<K, V> from(Function<K, V> function) {
  42.         checkArgument(function instanceof Serializable,
  43.             "function must be serializable");
  44.  
  45.         return new HazelcastAsyncFunction<K, V>(function);
  46.     }
  47.  
  48.     /**
  49.      * Creates a HazelcastAsyncFunction identical to this one but running
  50.      * function applications on the given Hazelcast {@link Executor}.
  51.      *
  52.      * @throws IllegalArgumentException if exec is not a
  53.      * Hazelcast Executor
  54.      */
  55.     public HazelcastAsyncFunction<K, V> onExecutor(Executor exec) {
  56.         checkArgument(supportsDistributedExecution(checkNotNull(exec, "exec")),
  57.             "exec must be a Hazelcast-managed Executor");
  58.         return new HazelcastAsyncFunction<K, V>(function, exec, null, taskKeyFunction);
  59.     }
  60.  
  61.     /**
  62.      * Creates a HazelcastAsyncFunction identical to this one but running
  63.      * function applications on the named Hazelcast {@link Executor}.
  64.      */
  65.     public HazelcastAsyncFunction<K, V> onExecutorNamed(String name) {
  66.         return new HazelcastAsyncFunction<K, V>(function, null, name, taskKeyFunction);
  67.     }
  68.  
  69.     /**
  70.      * Creates a HazelcastAsyncFunction identical to this one but running
  71.      * function applications on the default Hazelcast {@link Executor}.
  72.      * This is the default behavior.
  73.      */
  74.     public HazelcastAsyncFunction<K, V> onDefaultExecutor() {
  75.         return new HazelcastAsyncFunction<K, V>(function, null, null, taskKeyFunction);
  76.     }
  77.  
  78.     /**
  79.      * Creates a HazelcastAsyncFunction identical to this one, but using the
  80.      * supplied task key function to map input values to a
  81.      * task key object that Hazelcast will use to determine
  82.      * which cluster node to run a function application on.
  83.      */
  84.     public HazelcastAsyncFunction<K, V> withTaskKeyFunction(Function<K, Object> taskKeyFunction) {
  85.         return new HazelcastAsyncFunction<K, V>(function, exec, name,
  86.             checkNotNull(taskKeyFunction, "taskKeyFunction"));
  87.     }
  88.  
  89.     /**
  90.      * Creates a HazelcastAsyncFunction identical to this one, but using the
  91.      * input values as task keys.
  92.      */
  93.     public HazelcastAsyncFunction<K, V> withInputsAsTaskKeys() {
  94.         Function<K, Object> identityTaskKeyFunction = new Function<K, Object>() {
  95.             public Object apply(K input) { return input; }
  96.         };
  97.         return new HazelcastAsyncFunction<K, V>(function, exec, name, identityTaskKeyFunction);
  98.     }
  99.  
  100.     /**
  101.      * Creates a HazelcastAsyncFunction identical to this one, but not providing
  102.      * task keys, letting Hazelcast decide which node to perform function
  103.      * applications on. This is the default behavior.
  104.      */
  105.     public HazelcastAsyncFunction<K, V> withNoTaskKeys(Function<K, Object> taskKeyFunction) {
  106.         return new HazelcastAsyncFunction<K, V>(function, exec, name, null);
  107.     }
  108.  
  109.  
  110.     public ListenableFuture<V> apply(K key) {
  111.         if (key instanceof Serializable) {
  112.             CallbackFuture<V> future = new CallbackFuture<V>();
  113.  
  114.             Callable<V> callable = new FunctionApplyingCallable<K, V>(function, key);
  115.             DistributedTask<V> task = taskKeyFunction == null
  116.                 ? new DistributedTask<V>(callable)
  117.                 : new DistributedTask<V>(callable, taskKeyFunction.apply(key))
  118.                 ;
  119.             task.setExecutionCallback(future);
  120.             getExecutor().execute(task);
  121.  
  122.             return future;
  123.         } else {
  124.             // Key isn't serializable, so we apply the function
  125.             // synchronously rather than throw an exception.
  126.             return Futures.immediateFuture(function.apply(key));
  127.         }
  128.     }
  129.  
  130.     static class CallbackFuture<V>
  131.             extends ForwardingListenableFuture<V>
  132.             implements ExecutionCallback<V> {
  133.  
  134.         public void done(Future<V> future) {
  135.             try {
  136.                 delegate.set(future.get());
  137.             } catch (InterruptedException ex) {
  138.                 Thread.currentThread().interrupt();
  139.             } catch (ExecutionException ex) {
  140.                 delegate.setException(ex.getCause());
  141.             } catch (Throwable ex) {
  142.                 delegate.setException(ex);
  143.             }
  144.         }
  145.  
  146.         @Override protected final ListenableFuture<V> delegate() {
  147.             return delegate;
  148.         }
  149.  
  150.         private final SettableFuture<V> delegate =
  151.             SettableFuture.create();
  152.     }
  153.  
  154.  
  155.     static class FunctionApplyingCallable<K, V>
  156.             implements Callable<V>, Serializable {
  157.  
  158.         FunctionApplyingCallable(Function<K, V> function, K input) {
  159.             this.function = function;
  160.             this.input = input;
  161.         }
  162.  
  163.         public V call() {
  164.             return function.apply(input);
  165.         }
  166.  
  167.         private final Function<K, V> function;
  168.         private final K input;
  169.     }
  170.  
  171.  
  172.     HazelcastAsyncFunction(Function<K, V> function) {
  173.         this.function = function;
  174.         this.exec = null;
  175.         this.name = null;
  176.         this.taskKeyFunction = null;
  177.     }
  178.  
  179.     HazelcastAsyncFunction(Function<K, V> function,
  180.                            Executor exec,
  181.                            String name,
  182.                            Function<K, Object> taskKeyFunction) {
  183.  
  184.         this.function = function;
  185.         this.exec = exec;
  186.         this.name = name;
  187.         this.taskKeyFunction = taskKeyFunction;
  188.     }
  189.  
  190.     /**
  191.      * Returns whether the given {@link Executor} supports distributed
  192.      * execution by checking if it was produced by a call to Hazelcast's
  193.      * {@link HazelcastInstance#getExecutorService getExecutorService}.
  194.      */
  195.     private static boolean supportsDistributedExecution(Executor exec) {
  196.         return exec.getClass().getName().matches("^.*hazelcast.*$");
  197.     }
  198.  
  199.     private Executor getExecutor() {
  200.         // Avoid calling Hazelcast methods until we actually
  201.         // use the Executor.
  202.         if (exec == null) {
  203.             if (name == null) {
  204.                 return Hazelcast.getExecutorService();
  205.             } else {
  206.                 return Hazelcast.getExecutorService(name);
  207.             }
  208.         } else {
  209.             return exec;
  210.         }
  211.     }
  212.  
  213.     private final Function<K, V> function;
  214.     private final Executor exec;
  215.     private final String name;
  216.     private final Function<K, Object> taskKeyFunction;
  217. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement