Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.hazelcast.util;
- import com.google.common.base.Function;
- import com.google.common.collect.ImmutableList;
- import com.google.common.collect.Iterables;
- import com.google.common.collect.MapMaker;
- import com.google.common.util.concurrent.FutureCallback;
- import static com.google.common.base.Preconditions.checkArgument;
- import static com.google.common.base.Preconditions.checkNotNull;
- import com.hazelcast.core.DistributedTask;
- import com.hazelcast.core.Hazelcast;
- import com.hazelcast.core.HazelcastInstance;
- import static com.hazelcast.util.ExecUtil.isHazelcast;
- import java.io.Serializable;
- import java.util.List;
- import java.util.concurrent.Callable;
- import java.util.concurrent.CompletionService;
- import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.ExecutorCompletionService;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.TimeUnit;
- /**
- * Concurrent function evaluation.
- */
- public abstract class ConcurrentFunction<K, V> implements Function<Iterable<K>, ConcurrentMap<K, V>> {
- /**
- * Instructs Hazelcast to evaluate functions on the member that
- * owns the key.
- */
- @SuppressWarnings("unchecked")
- public static <K> Function<K, Object> selfKey() {
- return (Function<K, Object>) SELF_KEY;
- }
- /**
- * Instructs Hazelcast to evaluate functions on the member of
- * its choice.
- */
- @SuppressWarnings("unchecked")
- public static <K> Function<K, Object> noKey() {
- return (Function<K, Object>) NO_KEY;
- }
- /**
- * Thrown by methods that can have multiple failures but still return a
- * partial results map.
- */
- public static class PartialResultsException extends RuntimeException {
- public PartialResultsException(ConcurrentMap<?, ?> partialResults, Iterable<Throwable> causes) {
- this.partialResults = partialResults;
- this.causes = ImmutableList.copyOf(causes);
- }
- public ConcurrentMap<?, ?> getPartialResults() {
- return partialResults;
- }
- public List<Throwable> getCauses() {
- return causes;
- }
- private final ConcurrentMap<?, ?> partialResults;
- private final ImmutableList<Throwable> causes;
- }
- /**
- * Constructs a function that can apply given function to a
- * sequence of keys concurrently,
- * producing a map from the keys to the resulting function values.
- * Uses the default {@link ExecutorService} from the default
- * Hazelcast instance. Only suitable for settings in which the
- * default {@link HazelcastInstance} is used exclusively, i.e.,
- * through static methods of {@link Hazelcast}.
- * Provides the key as the second argument to the {@link DistributedTask}
- * constructor.
- */
- public static <K, V> ConcurrentFunction<K, V> of(Function<K, V> function) {
- return new HazelcastConcurrentFunction<K, V>(function, Hazelcast.getExecutorService(), ConcurrentFunction.<K>selfKey());
- }
- /**
- * Constructs a function that can apply given function to a
- * sequence of keys concurrently,
- * producing a map from the keys to the resulting function values.
- * Uses the default {@link ExecutorService} from the default
- * Hazelcast instance. Only suitable for settings in which the
- * default {@link HazelcastInstance} is used exclusively, i.e.,
- * through static methods of {@link Hazelcast}.
- * Maps keys to members via a key function, which returns an object
- * to be used as the second argument to the {@link DistributedTask}
- * constructor if non-null, and will use the one argument constructor
- * if null.
- */
- public static <K, V> ConcurrentFunction<K, V> of(Function<K, V> function, Function<K, Object> keyFunction) {
- return new HazelcastConcurrentFunction<K, V>(function, Hazelcast.getExecutorService(), keyFunction);
- }
- /**
- * Constructs a function that can apply given function to a
- * sequence of keys concurrently, using the provided {@link ExecutorService},
- * producing a map from the keys to the resulting function values.
- * If the provided {@link ExecutorService} supports {@link DistributedTask}s,
- * provides the key as the second argument to the {@link DistributedTask}
- * constructor.
- */
- public static <K, V> ConcurrentFunction<K, V> of(Function<K, V> function, ExecutorService exec) {
- if (isHazelcast(exec)) {
- return new HazelcastConcurrentFunction<K, V>(function, exec, ConcurrentFunction.<K>selfKey());
- } else {
- return new SimpleConcurrentFunction<K, V>(function, exec);
- }
- }
- /**
- * Constructs a function that can apply given function to a
- * sequence of keys concurrently, using the provided {@link ExecutorService},
- * producing a map from the keys to the resulting function values.
- * Maps keys to members via a key function, which returns an object
- * to be used as the second argument to the {@link DistributedTask}
- * constructor if non-null, and will use the one argument constructor
- * if null.
- * @param exec must be an {@link ExecutorService} provided by Hazelcast
- */
- public static <K, V> ConcurrentFunction<K, V> of(
- Function<K, V> function,
- ExecutorService exec,
- Function<K, Object> keyFunction) {
- checkArgument(isHazelcast(exec), "ExecutorService must support DistributedTasks");
- return new HazelcastConcurrentFunction<K, V>(function, exec, keyFunction);
- }
- /**
- * Concurrently applies the underlying function to the given keys,
- * producing a map from to keys the corresponding function values.
- * blocking indefinitely until all evaluations have either returned
- * a result, thrown an exception, or been rejected for execution.
- * If interrupted while waiting, restores interrupt status and returns
- * empty map.
- * @throws PartialResultsException if any of the function evaluations
- * throw exceptions or errors.
- */
- public ConcurrentMap<K, V> apply(Iterable<K> keys) throws PartialResultsException {
- try {
- return apply(keys, -1, null);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return new MapMaker().makeMap();
- }
- }
- /**
- * Concurrently applies the given function to the given keys,
- * producing a map from keys to the corresponding function values,
- * waiting up to the given amount of time for the function evaluations
- * to complete. Key-value mappings might still be added to the map
- * after a timeout.
- * @throws InterruptedException if interrupted while waiting
- * @throws PartialResultsException if any of the function evaluations
- * throw exceptions or errors.
- */
- public ConcurrentMap<K, V> apply(Iterable<K> keys, long timeout, TimeUnit timeoutUnit)
- throws InterruptedException, PartialResultsException {
- final ConcurrentMap<K, V> results = new MapMaker().makeMap();
- final ConcurrentMap<Throwable, Boolean> exceptions = new MapMaker().makeMap();
- return doApply(keys, timeout, timeoutUnit, results, exceptions);
- }
- protected abstract ConcurrentMap<K, V> doApply(
- Iterable<K> keys,
- long timeout,
- TimeUnit timeoutUnit,
- ConcurrentMap<K, V> results,
- ConcurrentMap<Throwable, Boolean> exceptions)
- throws InterruptedException, PartialResultsException;
- static class HazelcastConcurrentFunction<K, V> extends ConcurrentFunction<K, V> {
- HazelcastConcurrentFunction(Function<K, V> function, ExecutorService exec, Function<K, Object> keyFunction) {
- super(function, exec);
- this.keyFunction = checkNotNull(keyFunction, "keyFunction");
- checkArgument(function instanceof Serializable, "Function must be serializable");
- }
- @Override protected ConcurrentMap<K, V> doApply(
- Iterable<K> keys,
- long timeout,
- TimeUnit timeoutUnit,
- final ConcurrentMap<K, V> results,
- final ConcurrentMap<Throwable, Boolean> exceptions)
- throws InterruptedException, PartialResultsException {
- ExecUtil.invokeAll(
- exec,
- Iterables.transform(keys, new Function<K, DistributedTask<Result<K, V>>>() {
- public DistributedTask<Result<K, V>> apply(K key) {
- Task<K, V> task = new Task<K, V>(key, function);
- Object memberKey = keyFunction.apply(key);
- if (memberKey == null) {
- return new DistributedTask<Result<K, V>>(task);
- } else {
- return new DistributedTask<Result<K, V>>(task, memberKey);
- }
- }
- }),
- timeout, timeoutUnit,
- new FutureCallback<Result<K, V>>() {
- public void onSuccess(Result<K, V> result) {
- if (result.value != null) {
- results.put(result.key, result.value);
- }
- }
- public void onFailure(Throwable ex) {
- exceptions.put(ex, true);
- }
- }
- );
- if (exceptions.isEmpty()) {
- return results;
- } else {
- throw new PartialResultsException(results, exceptions.keySet());
- }
- }
- private final Function<K, Object> keyFunction;
- }
- static class SimpleConcurrentFunction<K, V> extends ConcurrentFunction<K, V> {
- SimpleConcurrentFunction(Function<K, V> function, ExecutorService exec) {
- super(function, exec);
- }
- @Override protected ConcurrentMap<K, V> doApply(
- Iterable<K> keys,
- long timeout,
- TimeUnit timeoutUnit,
- final ConcurrentMap<K, V> results,
- final ConcurrentMap<Throwable, Boolean> exceptions)
- throws InterruptedException, PartialResultsException {
- Iterable<Runnable> tasks = Iterables.transform(keys, new Function<K, Runnable>() {
- public Runnable apply(final K key) {
- return new Runnable() {
- public void run() {
- try {
- V value = function.apply(key);
- results.put(key, value);
- } catch (Throwable ex) {
- exceptions.put(ex, true);
- }
- }
- };
- }
- });
- int taskCount = 0;
- CompletionService<Void> cs = new ExecutorCompletionService<Void>(exec);
- for (Runnable task : tasks) {
- cs.submit(task, (Void) null);
- ++taskCount;
- }
- if (timeout < 0 || timeoutUnit == null) {
- while (taskCount > 0) {
- cs.take();
- --taskCount;
- }
- } else {
- long timeoutNanos = timeoutUnit.toNanos(timeout);
- long limit = System.nanoTime() + timeoutNanos;
- while (taskCount > 0 && timeoutNanos >= 0) {
- if (cs.poll(timeoutNanos, TimeUnit.NANOSECONDS) != null) {
- --taskCount;
- }
- timeoutNanos = limit - System.nanoTime();
- }
- }
- if (exceptions.isEmpty()) {
- return results;
- } else {
- throw new PartialResultsException(results, exceptions.keySet());
- }
- }
- }
- static class Result<K, V> implements Serializable {
- Result(K key, V value) {
- this.key = key;
- this.value = value;
- }
- final K key;
- final V value;
- }
- static class Task<K, V> implements Callable<Result<K, V>>, Serializable {
- public Result<K, V> call() {
- return new Result<K, V>(key, function.apply(key));
- }
- Task(K key, Function<K, V> function) {
- this.key = key;
- this.function = function;
- }
- private final K key;
- private final Function<K, V> function;
- }
- /**
- * Default access rather than protected, because this class is not designed
- * for extension in general.
- */
- ConcurrentFunction(Function<K, V> function, ExecutorService exec) {
- this.function = checkNotNull(function, "function");
- this.exec = checkNotNull(exec, "exec");
- }
- protected final Function<K, V> function;
- protected final ExecutorService exec;
- private static final Function<Object, Object> SELF_KEY = new Function<Object, Object>() {
- public Object apply(Object key) {
- return key;
- }
- };
- private static final Function<Object, Object> NO_KEY = new Function<Object, Object>() {
- public Object apply(Object key) {
- return null;
- }
- };
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement