Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * This code is placed in the public domain by its author, Tim Peierls.
- */
- package net.peierls.util.concurrent;
- import com.google.common.base.Function;
- import com.google.common.base.Throwables;
- import static com.google.common.base.Preconditions.checkArgument;
- import static com.google.common.base.Preconditions.checkNotNull;
- import com.google.common.cache.CacheLoader;
- import com.google.common.collect.ImmutableMap;
- import com.google.common.collect.Iterables;
- import com.google.common.collect.MapMaker;
- import com.google.common.util.concurrent.AsyncFunction;
- import com.google.common.util.concurrent.FutureCallback;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import com.google.common.util.concurrent.UncheckedExecutionException;
- import java.io.Serializable;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.TimeoutException;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Static factory methods for creating a {@link CacheLoader} instance
- * from an {@link AsyncFunction}.
- * @author Tim Peierls, tim at peierls dot net
- */
- public final class CacheLoaders {
- /**
- * Creates a cache loader that loads values for keys asynchronously
- * with {@code asyncFunction}, waiting indefinitely for the returned
- * future to complete.
- */
- public static <K, V> CacheLoader<K, V> fromAsyncFunction(
- AsyncFunction<K, V> asyncFunction) {
- return new AsyncCacheLoader<K, V>(asyncFunction);
- }
- /**
- * Creates a cache loader that loads values for keys asynchronously
- * with {@code asyncFunction}, waiting up to the given amount of time
- * for the returned future to complete. The bulk load operation does
- * not throw an exception on timeout, but the returned map might not
- * contain values for all of the given keys.
- */
- public static <K, V> CacheLoader<K, V> fromAsyncFunction(
- AsyncFunction<K, V> asyncFunction, long timeout, TimeUnit unit) {
- checkArgument(timeout >= 0, "timeout must be non-negative");
- return new AsyncCacheLoader<K, V>(
- asyncFunction, timeout, checkNotNull(unit, "unit"));
- }
- static class AsyncCacheLoader<K, V>
- extends CacheLoader<K, V> implements Serializable {
- @Override public ListenableFuture<V> reload(K key, V oldValue)
- throws Exception {
- return asyncFunction.apply(key);
- }
- @Override public V load(K key) throws Exception {
- ListenableFuture<V> future = asyncFunction.apply(key);
- try {
- if (timeout < 0 || unit == null) {
- return future.get();
- } else {
- return future.get(timeout, unit);
- }
- } catch (ExecutionException ex) {
- Throwable t = ex.getCause();
- Throwables.propagateIfInstanceOf(t, Exception.class);
- throw new RuntimeException("unexpected", t); // Should never get here
- }
- }
- @Override public Map<K, V> loadAll(Iterable<? extends K> keys)
- throws Exception {
- // Results map has to be safe for concurrent access:
- final Map<K, V> results = new MapMaker().makeMap();
- // Start with one extra to prevent decrementing to 0
- // until we're ready.
- final AtomicInteger toDoCount = new AtomicInteger(1);
- final CountDownLatch allDone = new CountDownLatch(1);
- for (final K key : keys) {
- toDoCount.incrementAndGet();
- Futures.addCallback(
- asyncFunction.apply(key),
- new FutureCallback<V>() {
- public void onSuccess(V result) {
- results.put(key, result);
- notifyDone();
- }
- public void onFailure(Throwable t) {
- // Ignore failures: getAll checks to see if
- // the returned map has all requested keys,
- // and caches the successful ones.
- notifyDone();
- }
- private void notifyDone() {
- if (toDoCount.decrementAndGet() == 0) {
- allDone.countDown();
- }
- }
- }
- );
- }
- // Remove the extra count now that all async
- // applications have been made.
- if (toDoCount.decrementAndGet() == 0) {
- allDone.countDown();
- }
- try {
- if (timeout < 0 || unit == null) {
- allDone.await();
- return results; // safe to return without copying
- } else {
- if (allDone.await(timeout, unit)) {
- return results; // safe to return without copying
- }
- }
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- // Timed out or interrupted, so some callbacks could
- // still fire. Copy to a stable value.
- return ImmutableMap.copyOf(results);
- }
- AsyncCacheLoader(AsyncFunction<K, V> asyncFunction) {
- this.asyncFunction = asyncFunction;
- this.timeout = -1;
- this.unit = null;
- }
- AsyncCacheLoader(AsyncFunction<K, V> asyncFunction,
- long timeout, TimeUnit unit) {
- this.asyncFunction = asyncFunction;
- this.timeout = timeout;
- this.unit = unit;
- }
- private final AsyncFunction<K, V> asyncFunction;
- private final long timeout;
- private final TimeUnit unit;
- }
- private CacheLoaders() { /* uninstantiable */ }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement