Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.hazelcast.util;
- import com.google.common.base.Function;
- import com.google.common.base.Supplier;
- import static com.google.common.base.Preconditions.checkNotNull;
- import com.google.common.collect.ImmutableList;
- import com.google.common.collect.Iterables;
- import com.google.common.util.concurrent.ForwardingExecutorService;
- import com.google.common.util.concurrent.FutureCallback;
- import com.google.common.util.concurrent.Futures;
- import com.hazelcast.core.DistributedTask;
- import com.hazelcast.core.ExecutionCallback;
- import com.hazelcast.core.HazelcastInstance;
- import java.io.Serializable;
- import java.util.Collection;
- import java.util.List;
- import java.util.concurrent.Callable;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Methods to invoke multiple {@link DistributedTask}s and wait for them
- * to complete, and an ExecutorService wrapper that adds a
- * distributed {@link ExecutorService#invokeAll invokeAll} method
- * to Hazelcast {@link HazelcastInstance#getExecutorService ExecutorService}s.
- */
- public final class ExecUtil {
- /**
- * Submits the given tasks on the supplied executor and
- * waits indefinitely for them to finish.
- * @param exec must be an {@link ExecutorService} provided by Hazelcast
- */
- public static void invokeAll(
- ExecutorService exec,
- Iterable<DistributedTask<Void>> tasks)
- throws InterruptedException {
- invokeAll(exec, tasks, -1, null);
- }
- /**
- * Submits the given tasks on the supplied executor and
- * waits indefinitely for them to finish, processing any
- * results with the given {@link ExecutionCallback}.
- * @param exec must be an {@link ExecutorService} provided by Hazelcast
- */
- public static <T> void invokeAll(
- ExecutorService exec,
- Iterable<DistributedTask<T>> tasks,
- ExecutionCallback<T> callback)
- throws InterruptedException {
- invokeAll(exec, tasks, -1, null, callback);
- }
- /**
- * Submits the given tasks on the supplied executor and
- * waits indefinitely for them to finish, processing any
- * results with the given {@link FutureCallback}.
- * @param exec must be an {@link ExecutorService} provided by Hazelcast
- */
- public static <T> void invokeAll(
- ExecutorService exec,
- Iterable<DistributedTask<T>> tasks,
- FutureCallback<T> callback)
- throws InterruptedException {
- invokeAll(exec, tasks, -1, null, callback);
- }
- /**
- * Submits the given tasks on the supplied executor and
- * waits up to the given amount of time for them to finish.
- * @param exec must be an {@link ExecutorService} provided by Hazelcast
- * @return true unless the time elapsed before completion
- */
- public static boolean invokeAll(
- ExecutorService exec,
- Iterable<DistributedTask<Void>> tasks,
- long timeout,
- TimeUnit unit)
- throws InterruptedException {
- return invokeAll(exec, tasks, timeout, unit, (ExecutionCallback<Void>) null);
- }
- /**
- * Submits the given tasks on the supplied executor and
- * waits up to the given amount of time for them to finish,
- * processing any results with the given {@link FutureCallback}.
- * @param exec must be an {@link ExecutorService} provided by Hazelcast
- * @return true unless the time elapsed before completion
- */
- public static <T> boolean invokeAll(
- ExecutorService exec,
- Iterable<DistributedTask<T>> tasks,
- long timeout,
- TimeUnit unit,
- final ExecutionCallback<T> callback)
- throws InterruptedException {
- checkNotNull(exec);
- checkNotNull(tasks);
- final CountDownLatch allDone = new CountDownLatch(1);
- // Initially 2 to prevent latch from opening early due to race-y
- // task completions. (Note that 2 is necessary and sufficient.)
- final AtomicInteger tasksLeft = new AtomicInteger(2);
- for (final DistributedTask<T> task : tasks) {
- final ExecutionCallback<T> prev = task.getExecutionCallback();
- task.setExecutionCallback(new ExecutionCallback<T>() {
- public void done(Future<T> future) {
- try {
- if (prev != null) {
- prev.done(future);
- }
- } finally {
- try {
- if (callback != null) {
- callback.done(future);
- }
- } finally {
- if (tasksLeft.decrementAndGet() == 0) {
- allDone.countDown();
- }
- }
- }
- }
- });
- try {
- exec.execute(task);
- // Only incremented if submission is successful.
- tasksLeft.incrementAndGet();
- } catch (RuntimeException ex) {
- if (callback == null) {
- throw ex;
- } else {
- callback.done(Futures.<T>immediateFailedFuture(ex));
- }
- }
- }
- // Remove the 2 that tasksLeft was initialized with.
- if (tasksLeft.addAndGet(-2) == 0) {
- allDone.countDown();
- }
- if (timeout < 0 || unit == null) {
- allDone.await();
- return true;
- } else {
- return allDone.await(timeout, unit);
- }
- }
- /**
- * Submits the given tasks on the supplied executor and
- * waits up to the given amount of time for them to finish,
- * processing any results with the given execution callback.
- * @param exec must be an {@link ExecutorService} provided by Hazelcast
- * @return true unless the time elapsed before completion
- */
- public static <T> boolean invokeAll(
- ExecutorService exec,
- Iterable<DistributedTask<T>> tasks,
- long timeout,
- TimeUnit unit,
- final FutureCallback<T> callback)
- throws InterruptedException {
- checkNotNull(callback);
- return invokeAll(exec, tasks, timeout, unit, new ExecutionCallback<T>() {
- public void done(Future<T> future) {
- doFutureCallback(callback, future);
- }
- });
- }
- /**
- * Returns an {@link ExecutorService} based on {@code exec}
- * that supports {@link ExecutorService#invokeAll invokeAll}
- * even if {@code exec} is a Hazelcast-based ExecutorService
- * (which doesn't support {@code invokeAll} out of the box).
- * The restriction is that the {@link Callable} tasks must be
- * {@link Serializable}. If a Callable task implements {@link Supplier},
- * that Supplier is used to get an Object used as a key passed to the
- * {@link DistributedTask} constructor.
- */
- public static ExecutorService withInvokeAll(final ExecutorService exec) {
- if (isHazelcast(exec)) {
- return new ForwardingExecutorService() {
- @SuppressWarnings("unchecked")
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- return doInvokeAll((Collection<Callable<T>>) tasks, -1, null);
- }
- @SuppressWarnings("unchecked")
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit) throws InterruptedException {
- return doInvokeAll((Collection<Callable<T>>) tasks, timeout, unit);
- }
- protected final ExecutorService delegate() {
- return exec;
- }
- private <T> List<Future<T>> doInvokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException {
- List<DistributedTask<T>> dtasks = ImmutableList.copyOf(Iterables.transform(tasks,
- new Function<Callable<T>, DistributedTask<T>>() {
- public DistributedTask<T> apply(Callable<T> callable) {
- return callableToTask(callable);
- }
- }
- ));
- ExecUtil.invokeAll(exec, dtasks, timeout, unit, (ExecutionCallback<T>) null);
- @SuppressWarnings("unchecked")
- List<Future<T>> results = (List<Future<T>>) (List<?>) dtasks;
- return results;
- }
- };
- } else {
- return exec;
- }
- }
- /**
- * Returns whether the given {@link ExecutorService} was
- * produced by a call to Hazelcast's
- * {@link HazelcastInstance#getExecutorService getExecutorService}.
- */
- public static boolean isHazelcast(ExecutorService exec) {
- return exec.getClass().getName().matches("^.*hazelcast.*$");
- }
- private static <T> DistributedTask<T> callableToTask(Callable<T> callable) {
- if (callable instanceof Supplier) {
- return new DistributedTask<T>(callable,
- Supplier.class.cast(callable).get());
- } else {
- return new DistributedTask<T>(callable);
- }
- }
- private static <T> void doFutureCallback(FutureCallback<T> callback, Future<T> future) {
- try {
- callback.onSuccess(future.get());
- } catch (ExecutionException ex) {
- callback.onFailure(ex.getCause());
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- private ExecUtil() { /* uninstantiable */ }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement