SHARE
TWEET

CacheLoaders

tpeierls Mar 12th, 2012 379 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /*
  2.  * This code is placed in the public domain by its author, Tim Peierls.
  3.  */
  4. package net.peierls.util.concurrent;
  5.  
  6. import com.google.common.base.Function;
  7. import com.google.common.base.Throwables;
  8. import static com.google.common.base.Preconditions.checkArgument;
  9. import static com.google.common.base.Preconditions.checkNotNull;
  10.  
  11. import com.google.common.cache.CacheLoader;
  12.  
  13. import com.google.common.collect.ImmutableMap;
  14. import com.google.common.collect.Iterables;
  15. import com.google.common.collect.MapMaker;
  16.  
  17. import com.google.common.util.concurrent.AsyncFunction;
  18. import com.google.common.util.concurrent.FutureCallback;
  19. import com.google.common.util.concurrent.Futures;
  20. import com.google.common.util.concurrent.ListenableFuture;
  21. import com.google.common.util.concurrent.UncheckedExecutionException;
  22.  
  23. import java.io.Serializable;
  24.  
  25. import java.util.List;
  26. import java.util.Map;
  27.  
  28. import java.util.concurrent.CountDownLatch;
  29. import java.util.concurrent.ExecutionException;
  30. import java.util.concurrent.TimeoutException;
  31. import java.util.concurrent.TimeUnit;
  32. import java.util.concurrent.atomic.AtomicInteger;
  33.  
  34.  
  35. /**
  36.  * Static factory methods for creating a {@link CacheLoader} instance
  37.  * from an {@link AsyncFunction}.
  38.  * @author Tim Peierls, tim at peierls dot net
  39.  */
  40. public final class CacheLoaders {
  41.  
  42.     /**
  43.      * Creates a cache loader that loads values for keys asynchronously
  44.      * with {@code asyncFunction}, waiting indefinitely for the returned
  45.      * future to complete.
  46.      */
  47.     public static <K, V> CacheLoader<K, V> fromAsyncFunction(
  48.             AsyncFunction<K, V> asyncFunction) {
  49.  
  50.         return new AsyncCacheLoader<K, V>(asyncFunction);
  51.     }
  52.  
  53.     /**
  54.      * Creates a cache loader that loads values for keys asynchronously
  55.      * with {@code asyncFunction}, waiting up to the given amount of time
  56.      * for the returned future to complete. The bulk load operation does
  57.      * not throw an exception on timeout, but the returned map might not
  58.      * contain values for all of the given keys.
  59.      */
  60.     public static <K, V> CacheLoader<K, V> fromAsyncFunction(
  61.             AsyncFunction<K, V> asyncFunction, long timeout, TimeUnit unit) {
  62.  
  63.         checkArgument(timeout >= 0, "timeout must be non-negative");
  64.  
  65.         return new AsyncCacheLoader<K, V>(
  66.             asyncFunction, timeout, checkNotNull(unit, "unit"));
  67.     }
  68.  
  69.  
  70.     static class AsyncCacheLoader<K, V>
  71.             extends CacheLoader<K, V> implements Serializable {
  72.  
  73.         @Override public ListenableFuture<V> reload(K key, V oldValue)
  74.                 throws Exception {
  75.  
  76.             return asyncFunction.apply(key);
  77.         }
  78.  
  79.         @Override public V load(K key) throws Exception {
  80.             ListenableFuture<V> future = asyncFunction.apply(key);
  81.             try {
  82.                 if (timeout < 0 || unit == null) {
  83.                     return future.get();
  84.                 } else {
  85.                     return future.get(timeout, unit);
  86.                 }
  87.             } catch (ExecutionException ex) {
  88.                 Throwable t = ex.getCause();
  89.                 Throwables.propagateIfInstanceOf(t, Exception.class);
  90.                 throw new RuntimeException("unexpected", t); // Should never get here
  91.             }
  92.         }
  93.  
  94.         @Override public Map<K, V> loadAll(Iterable<? extends K> keys)
  95.                 throws Exception {
  96.  
  97.             // Results map has to be safe for concurrent access:
  98.             final Map<K, V> results = new MapMaker().makeMap();
  99.  
  100.             // Start with one extra to prevent decrementing to 0
  101.             // until we're ready.
  102.             final AtomicInteger toDoCount = new AtomicInteger(1);
  103.             final CountDownLatch allDone = new CountDownLatch(1);
  104.  
  105.             for (final K key : keys) {
  106.                 toDoCount.incrementAndGet();
  107.                 Futures.addCallback(
  108.                     asyncFunction.apply(key),
  109.                     new FutureCallback<V>() {
  110.                         public void onSuccess(V result) {
  111.                             results.put(key, result);
  112.                             notifyDone();
  113.                         }
  114.  
  115.                         public void onFailure(Throwable t) {
  116.                             // Ignore failures: getAll checks to see if
  117.                             // the returned map has all requested keys,
  118.                             // and caches the successful ones.
  119.                             notifyDone();
  120.                         }
  121.  
  122.                         private void notifyDone() {
  123.                             if (toDoCount.decrementAndGet() == 0) {
  124.                                 allDone.countDown();
  125.                             }
  126.                         }
  127.                     }
  128.                 );
  129.             }
  130.  
  131.             // Remove the extra count now that all async
  132.             // applications have been made.
  133.  
  134.             if (toDoCount.decrementAndGet() == 0) {
  135.                 allDone.countDown();
  136.             }
  137.  
  138.             try {
  139.                 if (timeout < 0 || unit == null) {
  140.                     allDone.await();
  141.                     return results;     // safe to return without copying
  142.                 } else {
  143.                     if (allDone.await(timeout, unit)) {
  144.                         return results; // safe to return without copying
  145.                     }
  146.                 }
  147.             } catch (InterruptedException ex) {
  148.                 Thread.currentThread().interrupt();
  149.             }
  150.  
  151.             // Timed out or interrupted, so some callbacks could
  152.             // still fire. Copy to a stable value.
  153.  
  154.             return ImmutableMap.copyOf(results);
  155.         }
  156.  
  157.  
  158.         AsyncCacheLoader(AsyncFunction<K, V> asyncFunction) {
  159.             this.asyncFunction = asyncFunction;
  160.             this.timeout = -1;
  161.             this.unit = null;
  162.         }
  163.  
  164.         AsyncCacheLoader(AsyncFunction<K, V> asyncFunction,
  165.                          long timeout, TimeUnit unit) {
  166.  
  167.             this.asyncFunction = asyncFunction;
  168.             this.timeout = timeout;
  169.             this.unit = unit;
  170.         }
  171.  
  172.         private final AsyncFunction<K, V> asyncFunction;
  173.         private final long timeout;
  174.         private final TimeUnit unit;
  175.     }
  176.  
  177.  
  178.     private CacheLoaders() { /* uninstantiable */ }
  179. }
RAW Paste Data
Top