SHARE
TWEET

ConcurrentCacheLoader

tpeierls Mar 10th, 2012 248 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package com.hazelcast.util;
  2.  
  3. import com.google.common.base.Function;
  4. import static com.google.common.base.Preconditions.checkArgument;
  5. import static com.google.common.base.Preconditions.checkNotNull;
  6.  
  7. import com.google.common.cache.CacheLoader;
  8.  
  9. import com.google.common.collect.Iterables;
  10. import com.google.common.collect.MapMaker;
  11.  
  12. import com.google.common.util.concurrent.FutureCallback;
  13.  
  14. import com.hazelcast.core.DistributedTask;
  15. import com.hazelcast.core.Hazelcast;
  16.  
  17. import static com.hazelcast.util.ExecUtil.isHazelcast;
  18.  
  19. import java.io.Serializable;
  20.  
  21. import java.util.Map;
  22.  
  23. import java.util.concurrent.Callable;
  24. import java.util.concurrent.CompletionService;
  25. import java.util.concurrent.ConcurrentMap;
  26. import java.util.concurrent.ExecutorCompletionService;
  27. import java.util.concurrent.Executors;
  28. import java.util.concurrent.ExecutorService;
  29. import java.util.concurrent.TimeUnit;
  30.  
  31.  
  32. /**
  33.  * Loads cache entries concurrently, possibly in a distributed fashion.
  34.  */
  35. public class ConcurrentCacheLoader<K, V> extends CacheLoader<K, V> implements Serializable {
  36.  
  37.     /**
  38.      * Creates a new builder for a concurrent or distributed cache loader.
  39.      */
  40.     public static <K, V> Builder<K, V> newBuilder() {
  41.         return new Builder<K, V>();
  42.     }
  43.  
  44.     /**
  45.      * Immutable builder class for {@link ConcurrentCacheLoader} instances.
  46.      */
  47.     public static class Builder<K, V> {
  48.  
  49.         /**
  50.          * Returns a builder identical to this one but with that will use
  51.          * the given ExecutorService to handle calls to
  52.          * {@link CacheLoader#loadAll loadAll}.
  53.          */
  54.         public Builder<K, V> using(ExecutorService exec) {
  55.             checkArgument(keyFunction == null || isHazelcast(exec),
  56.                 "Can't use non-distributed executor service with distribution function");
  57.             return new Builder<K, V>(checkNotNull(exec, "exec"), timeout, timeoutUnit, keyFunction);
  58.         }
  59.  
  60.         /**
  61.          * Returns a builder identical to this one but with that will use
  62.          * the default Hazelcast ExecutorService to handle calls to
  63.          * {@link CacheLoader#loadAll loadAll}.
  64.          */
  65.         public Builder<K, V> usingDefaultDistributedService() {
  66.             return new Builder<K, V>(
  67.                 Hazelcast.getExecutorService(), timeout, timeoutUnit, keyFunction);
  68.         }
  69.  
  70.         /**
  71.          * Returns a builder identical to this one but with that will use
  72.          * the named Hazelcast ExecutorService to handle calls to
  73.          * {@link CacheLoader#loadAll loadAll}.
  74.          */
  75.         public Builder<K, V> usingDistributedServiceNamed(String named) {
  76.             return new Builder<K, V>(
  77.                 Hazelcast.getExecutorService(named), timeout, timeoutUnit, keyFunction);
  78.         }
  79.  
  80.         /**
  81.          * Returns a builder identical to this one but with that will
  82.          * spend no more than the given amount of time in calls to
  83.          * {@link CacheLoader#loadAll loadAll}.
  84.          */
  85.         public Builder<K, V> timeLimit(long timeout, TimeUnit timeoutUnit) {
  86.             checkArgument(timeout >= 0, "timeout must be non-negative");
  87.             checkNotNull(timeoutUnit, "timeoutUnit");
  88.             return new Builder<K, V>(exec, timeout, timeoutUnit, keyFunction);
  89.         }
  90.  
  91.         /**
  92.          * Returns a builder identical to this one but with that will
  93.          * use the key to direct a distributed task during
  94.          * {@link CacheLoader#loadAll loadAll}.
  95.          */
  96.         public Builder<K, V> byKey() {
  97.             return byKeyFunction(new Function<K, Object>() {
  98.                 public Object apply(K key) {
  99.                     return key;
  100.                 }
  101.             });
  102.         }
  103.  
  104.         /**
  105.          * Returns a builder identical to this one but with that will
  106.          * use the given key function to produce a value from the key
  107.          * to direct a distributed task during {@link CacheLoader#loadAll loadAll}.
  108.          */
  109.         public Builder<K, V> byKeyFunction(Function<K, Object> keyFunction) {
  110.             checkArgument(isHazelcast(exec), "exec must support distributed tasks");
  111.             return new Builder<K, V>(
  112.                 exec, timeout, timeoutUnit, checkNotNull(keyFunction, "keyFunction"));
  113.         }
  114.  
  115.  
  116.         /**
  117.          * Returns a ConcurrentCacheLoader satisfying the constraints
  118.          * of this builder.
  119.          */
  120.         public ConcurrentCacheLoader<K, V> build(Function<K, V> function) {
  121.             if (isHazelcast(exec)) {
  122.                 checkArgument(function instanceof Serializable,
  123.                     "Function must be serializable for distributed execution");
  124.                 return new DistributedCacheLoader<K, V>(
  125.                     function, exec, keyFunction, timeout, timeoutUnit);
  126.             } else {
  127.                 return new ConcurrentCacheLoader<K, V>(
  128.                     function, exec, timeout, timeoutUnit);
  129.             }
  130.         }
  131.  
  132.  
  133.         public Builder() {
  134.             this.exec = DEFAULT_POOL;
  135.             this.timeout = -1;
  136.             this.timeoutUnit = null;
  137.             this.keyFunction = null;
  138.         }
  139.  
  140.         Builder(ExecutorService exec,
  141.                 long timeout,
  142.                 TimeUnit timeoutUnit,
  143.                 Function<K, Object> keyFunction) {
  144.  
  145.             this.exec = exec;
  146.             this.timeout = timeout;
  147.             this.timeoutUnit = timeoutUnit;
  148.             this.keyFunction = keyFunction;
  149.         }
  150.  
  151.         final ExecutorService exec;
  152.         final long timeout;
  153.         final TimeUnit timeoutUnit;
  154.         final Function<K, Object> keyFunction;
  155.  
  156.         private static final ExecutorService DEFAULT_POOL =
  157.             Executors.newCachedThreadPool();
  158.     }
  159.  
  160.  
  161.     @Override public V load(K key) throws Exception {
  162.         return function.apply(key);
  163.     }
  164.  
  165.     @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
  166.         final ConcurrentMap<K, V> results = new MapMaker().makeMap();
  167.         try {
  168.             return doLoadAll(keys, results);
  169.         } catch (InterruptedException ex) {
  170.             Thread.currentThread().interrupt();
  171.             return results;
  172.         }
  173.     }
  174.  
  175.     protected Map<K, V> doLoadAll(Iterable<? extends K> keys,
  176.                                   final ConcurrentMap<K, V> results)
  177.                                   throws InterruptedException {
  178.  
  179.         Iterable<Runnable> tasks =  Iterables.transform(keys, new Function<K, Runnable>() {
  180.             public Runnable apply(final K key) {
  181.                 return new Runnable() {
  182.                     public void run() {
  183.                         results.put(key, function.apply(key));
  184.                     }
  185.                 };
  186.             }
  187.         });
  188.  
  189.         int taskCount = 0;
  190.         CompletionService<Void> cs = new ExecutorCompletionService<Void>(exec);
  191.         for (Runnable task : tasks) {
  192.             cs.submit(task, (Void) null);
  193.             ++taskCount;
  194.         }
  195.  
  196.         if (timeout < 0 || timeoutUnit == null) {
  197.             while (taskCount > 0) {
  198.                 cs.take();
  199.                 --taskCount;
  200.             }
  201.         } else {
  202.             long timeoutNanos = timeoutUnit.toNanos(timeout);
  203.             long limit = System.nanoTime() + timeoutNanos;
  204.             while (taskCount > 0 && timeoutNanos >= 0) {
  205.                 if (cs.poll(timeoutNanos, TimeUnit.NANOSECONDS) != null) {
  206.                     --taskCount;
  207.                 }
  208.                 timeoutNanos = limit - System.nanoTime();
  209.             }
  210.         }
  211.  
  212.         return results;
  213.     }
  214.  
  215.  
  216.     /**
  217.      * Default access rather than protected, because this class is not
  218.      * designed for extension in general.
  219.      */
  220.     ConcurrentCacheLoader(Function<K, V> function, ExecutorService exec,
  221.                           long timeout, TimeUnit timeoutUnit) {
  222.         this.function = checkNotNull(function, "function");
  223.         this.exec = checkNotNull(exec, "exec");
  224.         this.timeout = timeout;
  225.         this.timeoutUnit = timeoutUnit;
  226.     }
  227.  
  228.     final Function<K, V> function;
  229.     final ExecutorService exec;
  230.     final long timeout;
  231.     final TimeUnit timeoutUnit;
  232.  
  233.  
  234.     static class DistributedCacheLoader<K, V> extends ConcurrentCacheLoader<K, V> {
  235.  
  236.         protected Map<K, V> doLoadAll(Iterable<? extends K> keys,
  237.                                       final ConcurrentMap<K, V> results)
  238.                                       throws InterruptedException {
  239.  
  240.             ExecUtil.invokeAll(
  241.  
  242.                 exec,
  243.  
  244.                 Iterables.transform(keys, new Function<K, DistributedTask<Result<K, V>>>() {
  245.                     public DistributedTask<Result<K, V>> apply(K key) {
  246.                         Task<K, V> task = new Task<K, V>(key, function);
  247.                         if (keyFunction == null) {
  248.                             return new DistributedTask<Result<K, V>>(task);
  249.                         } else {
  250.                             return new DistributedTask<Result<K, V>>(task, keyFunction.apply(key));
  251.                         }
  252.                     }
  253.                 }),
  254.  
  255.                 timeout, timeoutUnit,
  256.  
  257.                 new FutureCallback<Result<K, V>>() {
  258.                     public void onSuccess(Result<K, V> result) {
  259.                         if (result.value != null) {
  260.                             results.put(result.key, result.value);
  261.                         }
  262.                     }
  263.                     public void onFailure(Throwable ex) {
  264.                         // We ignore failures because getAll will detect
  265.                         // the failure to get a result for all requested keys.
  266.                     }
  267.                 }
  268.             );
  269.  
  270.             return results;
  271.         }
  272.  
  273.         static class Result<K, V> implements Serializable {
  274.             Result(K key, V value) {
  275.                 this.key = key;
  276.                 this.value = value;
  277.             }
  278.             final K key;
  279.             final V value;
  280.         }
  281.  
  282.         static class Task<K, V> implements Callable<Result<K, V>>, Serializable {
  283.             public Result<K, V> call() {
  284.                 return new Result<K, V>(key, function.apply(key));
  285.             }
  286.  
  287.             Task(K key, Function<K, V> function) {
  288.                 this.key = key;
  289.                 this.function = function;
  290.             }
  291.             private final K key;
  292.             private final Function<K, V> function;
  293.         }
  294.  
  295.         DistributedCacheLoader(Function<K, V> function,
  296.                                ExecutorService exec,
  297.                                Function<K, Object> keyFunction,
  298.                                long timeout,
  299.                                TimeUnit timeoutUnit) {
  300.  
  301.             super(function, exec, timeout, timeoutUnit);
  302.  
  303.             this.keyFunction = keyFunction;
  304.         }
  305.  
  306.         private final Function<K, Object> keyFunction;
  307.     }
  308. }
RAW Paste Data
Pastebin PRO Summer Special!
Get 40% OFF on Pastebin PRO accounts!
Top