Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.hazelcast.util;
- import com.google.common.base.Function;
- import static com.google.common.base.Preconditions.checkArgument;
- import static com.google.common.base.Preconditions.checkNotNull;
- import com.google.common.cache.CacheLoader;
- import com.google.common.collect.Iterables;
- import com.google.common.collect.MapMaker;
- import com.google.common.util.concurrent.FutureCallback;
- import com.hazelcast.core.DistributedTask;
- import com.hazelcast.core.Hazelcast;
- import static com.hazelcast.util.ExecUtil.isHazelcast;
- import java.io.Serializable;
- import java.util.Map;
- import java.util.concurrent.Callable;
- import java.util.concurrent.CompletionService;
- import java.util.concurrent.ConcurrentMap;
- import java.util.concurrent.ExecutorCompletionService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.TimeUnit;
- /**
- * Loads cache entries concurrently, possibly in a distributed fashion.
- */
- public class ConcurrentCacheLoader<K, V> extends CacheLoader<K, V> implements Serializable {
- /**
- * Creates a new builder for a concurrent or distributed cache loader.
- */
- public static <K, V> Builder<K, V> newBuilder() {
- return new Builder<K, V>();
- }
- /**
- * Immutable builder class for {@link ConcurrentCacheLoader} instances.
- */
- public static class Builder<K, V> {
- /**
- * Returns a builder identical to this one but with that will use
- * the given ExecutorService to handle calls to
- * {@link CacheLoader#loadAll loadAll}.
- */
- public Builder<K, V> using(ExecutorService exec) {
- checkArgument(keyFunction == null || isHazelcast(exec),
- "Can't use non-distributed executor service with distribution function");
- return new Builder<K, V>(checkNotNull(exec, "exec"), timeout, timeoutUnit, keyFunction);
- }
- /**
- * Returns a builder identical to this one but with that will use
- * the default Hazelcast ExecutorService to handle calls to
- * {@link CacheLoader#loadAll loadAll}.
- */
- public Builder<K, V> usingDefaultDistributedService() {
- return new Builder<K, V>(
- Hazelcast.getExecutorService(), timeout, timeoutUnit, keyFunction);
- }
- /**
- * Returns a builder identical to this one but with that will use
- * the named Hazelcast ExecutorService to handle calls to
- * {@link CacheLoader#loadAll loadAll}.
- */
- public Builder<K, V> usingDistributedServiceNamed(String named) {
- return new Builder<K, V>(
- Hazelcast.getExecutorService(named), timeout, timeoutUnit, keyFunction);
- }
- /**
- * Returns a builder identical to this one but with that will
- * spend no more than the given amount of time in calls to
- * {@link CacheLoader#loadAll loadAll}.
- */
- public Builder<K, V> timeLimit(long timeout, TimeUnit timeoutUnit) {
- checkArgument(timeout >= 0, "timeout must be non-negative");
- checkNotNull(timeoutUnit, "timeoutUnit");
- return new Builder<K, V>(exec, timeout, timeoutUnit, keyFunction);
- }
- /**
- * Returns a builder identical to this one but with that will
- * use the key to direct a distributed task during
- * {@link CacheLoader#loadAll loadAll}.
- */
- public Builder<K, V> byKey() {
- return byKeyFunction(new Function<K, Object>() {
- public Object apply(K key) {
- return key;
- }
- });
- }
- /**
- * Returns a builder identical to this one but with that will
- * use the given key function to produce a value from the key
- * to direct a distributed task during {@link CacheLoader#loadAll loadAll}.
- */
- public Builder<K, V> byKeyFunction(Function<K, Object> keyFunction) {
- checkArgument(isHazelcast(exec), "exec must support distributed tasks");
- return new Builder<K, V>(
- exec, timeout, timeoutUnit, checkNotNull(keyFunction, "keyFunction"));
- }
- /**
- * Returns a ConcurrentCacheLoader satisfying the constraints
- * of this builder.
- */
- public ConcurrentCacheLoader<K, V> build(Function<K, V> function) {
- if (isHazelcast(exec)) {
- checkArgument(function instanceof Serializable,
- "Function must be serializable for distributed execution");
- return new DistributedCacheLoader<K, V>(
- function, exec, keyFunction, timeout, timeoutUnit);
- } else {
- return new ConcurrentCacheLoader<K, V>(
- function, exec, timeout, timeoutUnit);
- }
- }
- public Builder() {
- this.exec = DEFAULT_POOL;
- this.timeout = -1;
- this.timeoutUnit = null;
- this.keyFunction = null;
- }
- Builder(ExecutorService exec,
- long timeout,
- TimeUnit timeoutUnit,
- Function<K, Object> keyFunction) {
- this.exec = exec;
- this.timeout = timeout;
- this.timeoutUnit = timeoutUnit;
- this.keyFunction = keyFunction;
- }
- final ExecutorService exec;
- final long timeout;
- final TimeUnit timeoutUnit;
- final Function<K, Object> keyFunction;
- private static final ExecutorService DEFAULT_POOL =
- Executors.newCachedThreadPool();
- }
- @Override public V load(K key) throws Exception {
- return function.apply(key);
- }
- @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
- final ConcurrentMap<K, V> results = new MapMaker().makeMap();
- try {
- return doLoadAll(keys, results);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return results;
- }
- }
- protected Map<K, V> doLoadAll(Iterable<? extends K> keys,
- final ConcurrentMap<K, V> results)
- throws InterruptedException {
- Iterable<Runnable> tasks = Iterables.transform(keys, new Function<K, Runnable>() {
- public Runnable apply(final K key) {
- return new Runnable() {
- public void run() {
- results.put(key, function.apply(key));
- }
- };
- }
- });
- int taskCount = 0;
- CompletionService<Void> cs = new ExecutorCompletionService<Void>(exec);
- for (Runnable task : tasks) {
- cs.submit(task, (Void) null);
- ++taskCount;
- }
- if (timeout < 0 || timeoutUnit == null) {
- while (taskCount > 0) {
- cs.take();
- --taskCount;
- }
- } else {
- long timeoutNanos = timeoutUnit.toNanos(timeout);
- long limit = System.nanoTime() + timeoutNanos;
- while (taskCount > 0 && timeoutNanos >= 0) {
- if (cs.poll(timeoutNanos, TimeUnit.NANOSECONDS) != null) {
- --taskCount;
- }
- timeoutNanos = limit - System.nanoTime();
- }
- }
- return results;
- }
- /**
- * Default access rather than protected, because this class is not
- * designed for extension in general.
- */
- ConcurrentCacheLoader(Function<K, V> function, ExecutorService exec,
- long timeout, TimeUnit timeoutUnit) {
- this.function = checkNotNull(function, "function");
- this.exec = checkNotNull(exec, "exec");
- this.timeout = timeout;
- this.timeoutUnit = timeoutUnit;
- }
- final Function<K, V> function;
- final ExecutorService exec;
- final long timeout;
- final TimeUnit timeoutUnit;
- static class DistributedCacheLoader<K, V> extends ConcurrentCacheLoader<K, V> {
- protected Map<K, V> doLoadAll(Iterable<? extends K> keys,
- final ConcurrentMap<K, V> results)
- throws InterruptedException {
- ExecUtil.invokeAll(
- exec,
- Iterables.transform(keys, new Function<K, DistributedTask<Result<K, V>>>() {
- public DistributedTask<Result<K, V>> apply(K key) {
- Task<K, V> task = new Task<K, V>(key, function);
- if (keyFunction == null) {
- return new DistributedTask<Result<K, V>>(task);
- } else {
- return new DistributedTask<Result<K, V>>(task, keyFunction.apply(key));
- }
- }
- }),
- timeout, timeoutUnit,
- new FutureCallback<Result<K, V>>() {
- public void onSuccess(Result<K, V> result) {
- if (result.value != null) {
- results.put(result.key, result.value);
- }
- }
- public void onFailure(Throwable ex) {
- // We ignore failures because getAll will detect
- // the failure to get a result for all requested keys.
- }
- }
- );
- return results;
- }
- static class Result<K, V> implements Serializable {
- Result(K key, V value) {
- this.key = key;
- this.value = value;
- }
- final K key;
- final V value;
- }
- static class Task<K, V> implements Callable<Result<K, V>>, Serializable {
- public Result<K, V> call() {
- return new Result<K, V>(key, function.apply(key));
- }
- Task(K key, Function<K, V> function) {
- this.key = key;
- this.function = function;
- }
- private final K key;
- private final Function<K, V> function;
- }
- DistributedCacheLoader(Function<K, V> function,
- ExecutorService exec,
- Function<K, Object> keyFunction,
- long timeout,
- TimeUnit timeoutUnit) {
- super(function, exec, timeout, timeoutUnit);
- this.keyFunction = keyFunction;
- }
- private final Function<K, Object> keyFunction;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement