SHARE
TWEET

ExecUtil for Hazelcast

tpeierls Mar 9th, 2012 245 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1.  
  2. package com.hazelcast.util;
  3.  
  4. import com.google.common.base.Function;
  5. import com.google.common.base.Supplier;
  6. import static com.google.common.base.Preconditions.checkNotNull;
  7.  
  8. import com.google.common.collect.ImmutableList;
  9. import com.google.common.collect.Iterables;
  10.  
  11. import com.google.common.util.concurrent.ForwardingExecutorService;
  12. import com.google.common.util.concurrent.FutureCallback;
  13. import com.google.common.util.concurrent.Futures;
  14.  
  15. import com.hazelcast.core.DistributedTask;
  16. import com.hazelcast.core.ExecutionCallback;
  17. import com.hazelcast.core.HazelcastInstance;
  18.  
  19. import java.io.Serializable;
  20.  
  21. import java.util.Collection;
  22. import java.util.List;
  23.  
  24. import java.util.concurrent.Callable;
  25. import java.util.concurrent.CountDownLatch;
  26. import java.util.concurrent.ExecutionException;
  27. import java.util.concurrent.ExecutorService;
  28. import java.util.concurrent.Future;
  29. import java.util.concurrent.TimeUnit;
  30. import java.util.concurrent.atomic.AtomicInteger;
  31.  
  32.  
  33. /**
  34.  * Methods to invoke multiple {@link DistributedTask}s and wait for them
  35.  * to complete, and an ExecutorService wrapper that adds a
  36.  * distributed {@link ExecutorService#invokeAll invokeAll} method
  37.  * to Hazelcast {@link HazelcastInstance#getExecutorService ExecutorService}s.
  38.  */
  39. public final class ExecUtil {
  40.  
  41.     /**
  42.      * Submits the given tasks on the supplied executor and
  43.      * waits indefinitely for them to finish.
  44.      * @param exec must be an {@link ExecutorService} provided by Hazelcast
  45.      */
  46.     public static void invokeAll(
  47.             ExecutorService exec,
  48.             Iterable<DistributedTask<Void>> tasks)
  49.             throws InterruptedException {
  50.  
  51.         invokeAll(exec, tasks, -1, null);
  52.     }
  53.  
  54.  
  55.     /**
  56.      * Submits the given tasks on the supplied executor and
  57.      * waits indefinitely for them to finish, processing any
  58.      * results with the given {@link ExecutionCallback}.
  59.      * @param exec must be an {@link ExecutorService} provided by Hazelcast
  60.      */
  61.     public static <T> void invokeAll(
  62.             ExecutorService exec,
  63.             Iterable<DistributedTask<T>> tasks,
  64.             ExecutionCallback<T> callback)
  65.             throws InterruptedException {
  66.  
  67.         invokeAll(exec, tasks, -1, null, callback);
  68.     }
  69.  
  70.     /**
  71.      * Submits the given tasks on the supplied executor and
  72.      * waits indefinitely for them to finish, processing any
  73.      * results with the given {@link FutureCallback}.
  74.      * @param exec must be an {@link ExecutorService} provided by Hazelcast
  75.      */
  76.     public static <T> void invokeAll(
  77.             ExecutorService exec,
  78.             Iterable<DistributedTask<T>> tasks,
  79.             FutureCallback<T> callback)
  80.             throws InterruptedException {
  81.  
  82.         invokeAll(exec, tasks, -1, null, callback);
  83.     }
  84.  
  85.     /**
  86.      * Submits the given tasks on the supplied executor and
  87.      * waits up to the given amount of time for them to finish.
  88.      * @param exec must be an {@link ExecutorService} provided by Hazelcast
  89.      * @return true unless the time elapsed before completion
  90.      */
  91.     public static boolean invokeAll(
  92.             ExecutorService exec,
  93.             Iterable<DistributedTask<Void>> tasks,
  94.             long timeout,
  95.             TimeUnit unit)
  96.             throws InterruptedException {
  97.  
  98.         return invokeAll(exec, tasks, timeout, unit, (ExecutionCallback<Void>) null);
  99.     }
  100.  
  101.     /**
  102.      * Submits the given tasks on the supplied executor and
  103.      * waits up to the given amount of time for them to finish,
  104.      * processing any results with the given {@link FutureCallback}.
  105.      * @param exec must be an {@link ExecutorService} provided by Hazelcast
  106.      * @return true unless the time elapsed before completion
  107.      */
  108.     public static <T> boolean invokeAll(
  109.             ExecutorService exec,
  110.             Iterable<DistributedTask<T>> tasks,
  111.             long timeout,
  112.             TimeUnit unit,
  113.             final ExecutionCallback<T> callback)
  114.             throws InterruptedException {
  115.  
  116.         checkNotNull(exec);
  117.         checkNotNull(tasks);
  118.  
  119.         final CountDownLatch allDone = new CountDownLatch(1);
  120.  
  121.         // Initially 2 to prevent latch from opening early due to race-y
  122.         // task completions. (Note that 2 is necessary and sufficient.)
  123.         final AtomicInteger tasksLeft = new AtomicInteger(2);
  124.  
  125.         for (final DistributedTask<T> task : tasks) {
  126.             final ExecutionCallback<T> prev = task.getExecutionCallback();
  127.             task.setExecutionCallback(new ExecutionCallback<T>() {
  128.                 public void done(Future<T> future) {
  129.                     try {
  130.                         if (prev != null) {
  131.                             prev.done(future);
  132.                         }
  133.                     } finally {
  134.                         try {
  135.                             if (callback != null) {
  136.                                 callback.done(future);
  137.                             }
  138.                         } finally {
  139.                             if (tasksLeft.decrementAndGet() == 0) {
  140.                                 allDone.countDown();
  141.                             }
  142.                         }
  143.                     }
  144.                 }
  145.             });
  146.  
  147.             try {
  148.                 exec.execute(task);
  149.  
  150.                 // Only incremented if submission is successful.
  151.                 tasksLeft.incrementAndGet();
  152.  
  153.             } catch (RuntimeException ex) {
  154.                 if (callback == null) {
  155.                     throw ex;
  156.                 } else {
  157.                     callback.done(Futures.<T>immediateFailedFuture(ex));
  158.                 }
  159.             }
  160.         }
  161.  
  162.         // Remove the 2 that tasksLeft was initialized with.
  163.         if (tasksLeft.addAndGet(-2) == 0) {
  164.             allDone.countDown();
  165.         }
  166.  
  167.         if (timeout < 0 || unit == null) {
  168.             allDone.await();
  169.             return true;
  170.         } else {
  171.             return allDone.await(timeout, unit);
  172.         }
  173.     }
  174.  
  175.     /**
  176.      * Submits the given tasks on the supplied executor and
  177.      * waits up to the given amount of time for them to finish,
  178.      * processing any results with the given execution callback.
  179.      * @param exec must be an {@link ExecutorService} provided by Hazelcast
  180.      * @return true unless the time elapsed before completion
  181.      */
  182.     public static <T> boolean invokeAll(
  183.             ExecutorService exec,
  184.             Iterable<DistributedTask<T>> tasks,
  185.             long timeout,
  186.             TimeUnit unit,
  187.             final FutureCallback<T> callback)
  188.             throws InterruptedException {
  189.  
  190.         checkNotNull(callback);
  191.  
  192.         return invokeAll(exec, tasks, timeout, unit, new ExecutionCallback<T>() {
  193.             public void done(Future<T> future) {
  194.                 doFutureCallback(callback, future);
  195.             }
  196.         });
  197.     }
  198.  
  199.  
  200.  
  201.     /**
  202.      * Returns an {@link ExecutorService} based on {@code exec}
  203.      * that supports {@link ExecutorService#invokeAll invokeAll}
  204.      * even if {@code exec} is a Hazelcast-based ExecutorService
  205.      * (which doesn't support {@code invokeAll} out of the box).
  206.      * The restriction is that the {@link Callable} tasks must be
  207.      * {@link Serializable}. If a Callable task implements {@link Supplier},
  208.      * that Supplier is used to get an Object used as a key passed to the
  209.      * {@link DistributedTask} constructor.
  210.      */
  211.     public static ExecutorService withInvokeAll(final ExecutorService exec) {
  212.         if (isHazelcast(exec)) {
  213.             return new ForwardingExecutorService() {
  214.  
  215.                 @SuppressWarnings("unchecked")
  216.                 @Override
  217.                 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  218.                         throws InterruptedException {
  219.                     return doInvokeAll((Collection<Callable<T>>) tasks, -1, null);
  220.                 }
  221.  
  222.                 @SuppressWarnings("unchecked")
  223.                 @Override
  224.                 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  225.                         long timeout, TimeUnit unit) throws InterruptedException {
  226.  
  227.                     return doInvokeAll((Collection<Callable<T>>) tasks, timeout, unit);
  228.                 }
  229.  
  230.                 protected final ExecutorService delegate() {
  231.                     return exec;
  232.                 }
  233.  
  234.                 private <T> List<Future<T>> doInvokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
  235.                         throws InterruptedException {
  236.  
  237.                     List<DistributedTask<T>> dtasks = ImmutableList.copyOf(Iterables.transform(tasks,
  238.                         new Function<Callable<T>, DistributedTask<T>>() {
  239.                             public DistributedTask<T> apply(Callable<T> callable) {
  240.                                 return callableToTask(callable);
  241.                             }
  242.                         }
  243.                     ));
  244.  
  245.                     ExecUtil.invokeAll(exec, dtasks, timeout, unit, (ExecutionCallback<T>) null);
  246.  
  247.                     @SuppressWarnings("unchecked")
  248.                     List<Future<T>> results = (List<Future<T>>) (List<?>) dtasks;
  249.  
  250.                     return results;
  251.                 }
  252.             };
  253.         } else {
  254.             return exec;
  255.         }
  256.     }
  257.  
  258.  
  259.     /**
  260.      * Returns whether the given {@link ExecutorService} was
  261.      * produced by a call to Hazelcast's
  262.      * {@link HazelcastInstance#getExecutorService getExecutorService}.
  263.      */
  264.     public static boolean isHazelcast(ExecutorService exec) {
  265.         return exec.getClass().getName().matches("^.*hazelcast.*$");
  266.     }
  267.  
  268.  
  269.     private static <T> DistributedTask<T> callableToTask(Callable<T> callable) {
  270.         if (callable instanceof Supplier) {
  271.             return new DistributedTask<T>(callable,
  272.                 Supplier.class.cast(callable).get());
  273.         } else {
  274.             return new DistributedTask<T>(callable);
  275.         }
  276.     }
  277.  
  278.     private static <T> void doFutureCallback(FutureCallback<T> callback, Future<T> future) {
  279.         try {
  280.             callback.onSuccess(future.get());
  281.         } catch (ExecutionException ex) {
  282.             callback.onFailure(ex.getCause());
  283.         } catch (InterruptedException ex) {
  284.             Thread.currentThread().interrupt();
  285.         }
  286.     }
  287.  
  288.  
  289.     private ExecUtil() { /* uninstantiable */ }
  290. }
RAW Paste Data
Top