Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * This code is placed in the public domain by its author, Tim Peierls.
- */
- package com.hazelcast.util;
- import com.google.common.base.Function;
- import static com.google.common.base.Preconditions.checkArgument;
- import static com.google.common.base.Preconditions.checkNotNull;
- import com.google.common.util.concurrent.AsyncFunction;
- import com.google.common.util.concurrent.ForwardingListenableFuture;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import com.google.common.util.concurrent.SettableFuture;
- import com.hazelcast.core.DistributedTask;
- import com.hazelcast.core.ExecutionCallback;
- import com.hazelcast.core.Hazelcast;
- import com.hazelcast.core.HazelcastInstance;
- import java.io.Serializable;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Executor;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- /**
- * An implementation of {@link AsyncFunction} that is applied in
- * a Hazelcast distributed task, potentially on a different node
- * of the Hazelcast cluster than the one on which {@link #apply}
- * is called.
- */
- public class HazelcastAsyncFunction<K, V> implements AsyncFunction<K, V> {
- /**
- * Creates a builder of {@link AsyncFunction} instances that
- * will evaluate the given {@link Function} asynchronously.
- */
- public static <K, V> HazelcastAsyncFunction<K, V> from(Function<K, V> function) {
- checkArgument(function instanceof Serializable,
- "function must be serializable");
- return new HazelcastAsyncFunction<K, V>(function);
- }
- /**
- * Creates a HazelcastAsyncFunction identical to this one but running
- * function applications on the given Hazelcast {@link Executor}.
- *
- * @throws IllegalArgumentException if exec is not a
- * Hazelcast Executor
- */
- public HazelcastAsyncFunction<K, V> onExecutor(Executor exec) {
- checkArgument(supportsDistributedExecution(checkNotNull(exec, "exec")),
- "exec must be a Hazelcast-managed Executor");
- return new HazelcastAsyncFunction<K, V>(function, exec, null, taskKeyFunction);
- }
- /**
- * Creates a HazelcastAsyncFunction identical to this one but running
- * function applications on the named Hazelcast {@link Executor}.
- */
- public HazelcastAsyncFunction<K, V> onExecutorNamed(String name) {
- return new HazelcastAsyncFunction<K, V>(function, null, name, taskKeyFunction);
- }
- /**
- * Creates a HazelcastAsyncFunction identical to this one but running
- * function applications on the default Hazelcast {@link Executor}.
- * This is the default behavior.
- */
- public HazelcastAsyncFunction<K, V> onDefaultExecutor() {
- return new HazelcastAsyncFunction<K, V>(function, null, null, taskKeyFunction);
- }
- /**
- * Creates a HazelcastAsyncFunction identical to this one, but using the
- * supplied task key function to map input values to a
- * task key object that Hazelcast will use to determine
- * which cluster node to run a function application on.
- */
- public HazelcastAsyncFunction<K, V> withTaskKeyFunction(Function<K, Object> taskKeyFunction) {
- return new HazelcastAsyncFunction<K, V>(function, exec, name,
- checkNotNull(taskKeyFunction, "taskKeyFunction"));
- }
- /**
- * Creates a HazelcastAsyncFunction identical to this one, but using the
- * input values as task keys.
- */
- public HazelcastAsyncFunction<K, V> withInputsAsTaskKeys() {
- Function<K, Object> identityTaskKeyFunction = new Function<K, Object>() {
- public Object apply(K input) { return input; }
- };
- return new HazelcastAsyncFunction<K, V>(function, exec, name, identityTaskKeyFunction);
- }
- /**
- * Creates a HazelcastAsyncFunction identical to this one, but not providing
- * task keys, letting Hazelcast decide which node to perform function
- * applications on. This is the default behavior.
- */
- public HazelcastAsyncFunction<K, V> withNoTaskKeys(Function<K, Object> taskKeyFunction) {
- return new HazelcastAsyncFunction<K, V>(function, exec, name, null);
- }
- public ListenableFuture<V> apply(K key) {
- if (key instanceof Serializable) {
- CallbackFuture<V> future = new CallbackFuture<V>();
- Callable<V> callable = new FunctionApplyingCallable<K, V>(function, key);
- DistributedTask<V> task = taskKeyFunction == null
- ? new DistributedTask<V>(callable)
- : new DistributedTask<V>(callable, taskKeyFunction.apply(key))
- ;
- task.setExecutionCallback(future);
- getExecutor().execute(task);
- return future;
- } else {
- // Key isn't serializable, so we apply the function
- // synchronously rather than throw an exception.
- return Futures.immediateFuture(function.apply(key));
- }
- }
- static class CallbackFuture<V>
- extends ForwardingListenableFuture<V>
- implements ExecutionCallback<V> {
- public void done(Future<V> future) {
- try {
- delegate.set(future.get());
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException ex) {
- delegate.setException(ex.getCause());
- } catch (Throwable ex) {
- delegate.setException(ex);
- }
- }
- @Override protected final ListenableFuture<V> delegate() {
- return delegate;
- }
- private final SettableFuture<V> delegate =
- SettableFuture.create();
- }
- static class FunctionApplyingCallable<K, V>
- implements Callable<V>, Serializable {
- FunctionApplyingCallable(Function<K, V> function, K input) {
- this.function = function;
- this.input = input;
- }
- public V call() {
- return function.apply(input);
- }
- private final Function<K, V> function;
- private final K input;
- }
- HazelcastAsyncFunction(Function<K, V> function) {
- this.function = function;
- this.exec = null;
- this.name = null;
- this.taskKeyFunction = null;
- }
- HazelcastAsyncFunction(Function<K, V> function,
- Executor exec,
- String name,
- Function<K, Object> taskKeyFunction) {
- this.function = function;
- this.exec = exec;
- this.name = name;
- this.taskKeyFunction = taskKeyFunction;
- }
- /**
- * Returns whether the given {@link Executor} supports distributed
- * execution by checking if it was produced by a call to Hazelcast's
- * {@link HazelcastInstance#getExecutorService getExecutorService}.
- */
- private static boolean supportsDistributedExecution(Executor exec) {
- return exec.getClass().getName().matches("^.*hazelcast.*$");
- }
- private Executor getExecutor() {
- // Avoid calling Hazelcast methods until we actually
- // use the Executor.
- if (exec == null) {
- if (name == null) {
- return Hazelcast.getExecutorService();
- } else {
- return Hazelcast.getExecutorService(name);
- }
- } else {
- return exec;
- }
- }
- private final Function<K, V> function;
- private final Executor exec;
- private final String name;
- private final Function<K, Object> taskKeyFunction;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement