Advertisement
tpeierls

ConcurrentFunction

Mar 12th, 2012
542
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 13.43 KB | None | 0 0
  1.  
  2. package com.hazelcast.util;
  3.  
  4. import com.google.common.base.Function;
  5. import com.google.common.collect.ImmutableList;
  6. import com.google.common.collect.Iterables;
  7. import com.google.common.collect.MapMaker;
  8. import com.google.common.util.concurrent.FutureCallback;
  9. import static com.google.common.base.Preconditions.checkArgument;
  10. import static com.google.common.base.Preconditions.checkNotNull;
  11.  
  12. import com.hazelcast.core.DistributedTask;
  13. import com.hazelcast.core.Hazelcast;
  14. import com.hazelcast.core.HazelcastInstance;
  15.  
  16. import static com.hazelcast.util.ExecUtil.isHazelcast;
  17.  
  18. import java.io.Serializable;
  19.  
  20. import java.util.List;
  21.  
  22. import java.util.concurrent.Callable;
  23. import java.util.concurrent.CompletionService;
  24. import java.util.concurrent.ConcurrentMap;
  25. import java.util.concurrent.ExecutorCompletionService;
  26. import java.util.concurrent.ExecutorService;
  27. import java.util.concurrent.TimeUnit;
  28.  
  29.  
  30. /**
  31.  * Concurrent function evaluation.
  32.  */
  33. public abstract class ConcurrentFunction<K, V> implements Function<Iterable<K>, ConcurrentMap<K, V>> {
  34.  
  35.     /**
  36.      * Instructs Hazelcast to evaluate functions on the member that
  37.      * owns the key.
  38.      */
  39.     @SuppressWarnings("unchecked")
  40.     public static <K> Function<K, Object> selfKey() {
  41.         return (Function<K, Object>) SELF_KEY;
  42.     }
  43.  
  44.     /**
  45.      * Instructs Hazelcast to evaluate functions on the member of
  46.      * its choice.
  47.      */
  48.     @SuppressWarnings("unchecked")
  49.     public static <K> Function<K, Object> noKey() {
  50.         return (Function<K, Object>) NO_KEY;
  51.     }
  52.  
  53.     /**
  54.      * Thrown by methods that can have multiple failures but still return a
  55.      * partial results map.
  56.      */
  57.     public static class PartialResultsException extends RuntimeException {
  58.         public PartialResultsException(ConcurrentMap<?, ?> partialResults, Iterable<Throwable> causes) {
  59.             this.partialResults = partialResults;
  60.             this.causes = ImmutableList.copyOf(causes);
  61.         }
  62.  
  63.         public ConcurrentMap<?, ?> getPartialResults() {
  64.             return partialResults;
  65.         }
  66.  
  67.         public List<Throwable> getCauses() {
  68.             return causes;
  69.         }
  70.  
  71.         private final ConcurrentMap<?, ?> partialResults;
  72.         private final ImmutableList<Throwable> causes;
  73.     }
  74.  
  75.  
  76.     /**
  77.      * Constructs a function that can apply given function to a
  78.      * sequence of keys concurrently,
  79.      * producing a map from the keys to the resulting function values.
  80.      * Uses the default {@link ExecutorService} from the default
  81.      * Hazelcast instance. Only suitable for settings in which the
  82.      * default {@link HazelcastInstance} is used exclusively, i.e.,
  83.      * through static methods of {@link Hazelcast}.
  84.      * Provides the key as the second argument to the {@link DistributedTask}
  85.      * constructor.
  86.      */
  87.     public static <K, V> ConcurrentFunction<K, V> of(Function<K, V> function) {
  88.         return new HazelcastConcurrentFunction<K, V>(function, Hazelcast.getExecutorService(), ConcurrentFunction.<K>selfKey());
  89.     }
  90.  
  91.  
  92.     /**
  93.      * Constructs a function that can apply given function to a
  94.      * sequence of keys concurrently,
  95.      * producing a map from the keys to the resulting function values.
  96.      * Uses the default {@link ExecutorService} from the default
  97.      * Hazelcast instance. Only suitable for settings in which the
  98.      * default {@link HazelcastInstance} is used exclusively, i.e.,
  99.      * through static methods of {@link Hazelcast}.
  100.      * Maps keys to members via a key function, which returns an object
  101.      * to be used as the second argument to the {@link DistributedTask}
  102.      * constructor if non-null, and will use the one argument constructor
  103.      * if null.
  104.      */
  105.     public static <K, V> ConcurrentFunction<K, V> of(Function<K, V> function, Function<K, Object> keyFunction) {
  106.         return new HazelcastConcurrentFunction<K, V>(function, Hazelcast.getExecutorService(), keyFunction);
  107.     }
  108.  
  109.     /**
  110.      * Constructs a function that can apply given function to a
  111.      * sequence of keys concurrently, using the provided {@link ExecutorService},
  112.      * producing a map from the keys to the resulting function values.
  113.      * If the provided {@link ExecutorService} supports {@link DistributedTask}s,
  114.      * provides the key as the second argument to the {@link DistributedTask}
  115.      * constructor.
  116.      */
  117.     public static <K, V> ConcurrentFunction<K, V> of(Function<K, V> function, ExecutorService exec) {
  118.         if (isHazelcast(exec)) {
  119.             return new HazelcastConcurrentFunction<K, V>(function, exec, ConcurrentFunction.<K>selfKey());
  120.         } else {
  121.             return new SimpleConcurrentFunction<K, V>(function, exec);
  122.         }
  123.     }
  124.  
  125.     /**
  126.      * Constructs a function that can apply given function to a
  127.      * sequence of keys concurrently, using the provided {@link ExecutorService},
  128.      * producing a map from the keys to the resulting function values.
  129.      * Maps keys to members via a key function, which returns an object
  130.      * to be used as the second argument to the {@link DistributedTask}
  131.      * constructor if non-null, and will use the one argument constructor
  132.      * if null.
  133.      * @param exec must be an {@link ExecutorService} provided by Hazelcast
  134.      */
  135.     public static <K, V> ConcurrentFunction<K, V> of(
  136.             Function<K, V> function,
  137.             ExecutorService exec,
  138.             Function<K, Object> keyFunction) {
  139.  
  140.         checkArgument(isHazelcast(exec), "ExecutorService must support DistributedTasks");
  141.  
  142.         return new HazelcastConcurrentFunction<K, V>(function, exec, keyFunction);
  143.     }
  144.  
  145.  
  146.     /**
  147.      * Concurrently applies the underlying function to the given keys,
  148.      * producing a map from to keys the corresponding function values.
  149.      * blocking indefinitely until all evaluations have either returned
  150.      * a result, thrown an exception, or been rejected for execution.
  151.      * If interrupted while waiting, restores interrupt status and returns
  152.      * empty map.
  153.      * @throws PartialResultsException if any of the function evaluations
  154.      * throw exceptions or errors.
  155.      */
  156.     public ConcurrentMap<K, V> apply(Iterable<K> keys) throws PartialResultsException {
  157.         try {
  158.             return apply(keys, -1, null);
  159.         } catch (InterruptedException ex) {
  160.             Thread.currentThread().interrupt();
  161.             return new MapMaker().makeMap();
  162.         }
  163.     }
  164.  
  165.     /**
  166.      * Concurrently applies the given function to the given keys,
  167.      * producing a map from keys to the corresponding function values,
  168.      * waiting up to the given amount of time for the function evaluations
  169.      * to complete. Key-value mappings might still be added to the map
  170.      * after a timeout.
  171.      * @throws InterruptedException if interrupted while waiting
  172.      * @throws PartialResultsException if any of the function evaluations
  173.      * throw exceptions or errors.
  174.      */
  175.     public ConcurrentMap<K, V> apply(Iterable<K> keys, long timeout, TimeUnit timeoutUnit)
  176.             throws InterruptedException, PartialResultsException {
  177.  
  178.         final ConcurrentMap<K, V> results = new MapMaker().makeMap();
  179.         final ConcurrentMap<Throwable, Boolean> exceptions = new MapMaker().makeMap();
  180.  
  181.         return doApply(keys, timeout, timeoutUnit, results, exceptions);
  182.     }
  183.  
  184.     protected abstract ConcurrentMap<K, V> doApply(
  185.             Iterable<K> keys,
  186.             long timeout,
  187.             TimeUnit timeoutUnit,
  188.             ConcurrentMap<K, V> results,
  189.             ConcurrentMap<Throwable, Boolean> exceptions)
  190.             throws InterruptedException, PartialResultsException;
  191.  
  192.  
  193.     static class HazelcastConcurrentFunction<K, V> extends ConcurrentFunction<K, V> {
  194.  
  195.         HazelcastConcurrentFunction(Function<K, V> function, ExecutorService exec, Function<K, Object> keyFunction) {
  196.             super(function, exec);
  197.             this.keyFunction = checkNotNull(keyFunction, "keyFunction");
  198.             checkArgument(function instanceof Serializable, "Function must be serializable");
  199.         }
  200.  
  201.         @Override protected ConcurrentMap<K, V> doApply(
  202.                 Iterable<K> keys,
  203.                 long timeout,
  204.                 TimeUnit timeoutUnit,
  205.                 final ConcurrentMap<K, V> results,
  206.                 final ConcurrentMap<Throwable, Boolean> exceptions)
  207.                 throws InterruptedException, PartialResultsException {
  208.  
  209.             ExecUtil.invokeAll(
  210.  
  211.                 exec,
  212.  
  213.                 Iterables.transform(keys, new Function<K, DistributedTask<Result<K, V>>>() {
  214.                     public DistributedTask<Result<K, V>> apply(K key) {
  215.                         Task<K, V> task = new Task<K, V>(key, function);
  216.                         Object memberKey = keyFunction.apply(key);
  217.                         if (memberKey == null) {
  218.                             return new DistributedTask<Result<K, V>>(task);
  219.                         } else {
  220.                             return new DistributedTask<Result<K, V>>(task, memberKey);
  221.                         }
  222.                     }
  223.                 }),
  224.  
  225.                 timeout,  timeoutUnit,
  226.  
  227.                 new FutureCallback<Result<K, V>>() {
  228.                     public void onSuccess(Result<K, V> result) {
  229.                         if (result.value != null) {
  230.                             results.put(result.key, result.value);
  231.                         }
  232.                     }
  233.                     public void onFailure(Throwable ex) {
  234.                         exceptions.put(ex, true);
  235.                     }
  236.                 }
  237.             );
  238.  
  239.             if (exceptions.isEmpty()) {
  240.                 return results;
  241.             } else {
  242.                 throw new PartialResultsException(results, exceptions.keySet());
  243.             }
  244.         }
  245.  
  246.         private final Function<K, Object> keyFunction;
  247.     }
  248.  
  249.  
  250.     static class SimpleConcurrentFunction<K, V> extends ConcurrentFunction<K, V> {
  251.  
  252.         SimpleConcurrentFunction(Function<K, V> function, ExecutorService exec) {
  253.             super(function, exec);
  254.         }
  255.  
  256.         @Override protected ConcurrentMap<K, V> doApply(
  257.                 Iterable<K> keys,
  258.                 long timeout,
  259.                 TimeUnit timeoutUnit,
  260.                 final ConcurrentMap<K, V> results,
  261.                 final ConcurrentMap<Throwable, Boolean> exceptions)
  262.                 throws InterruptedException, PartialResultsException {
  263.  
  264.             Iterable<Runnable> tasks =  Iterables.transform(keys, new Function<K, Runnable>() {
  265.                 public Runnable apply(final K key) {
  266.                     return new Runnable() {
  267.                         public void run() {
  268.                             try {
  269.                                 V value = function.apply(key);
  270.                                 results.put(key, value);
  271.                             } catch (Throwable ex) {
  272.                                 exceptions.put(ex, true);
  273.                             }
  274.                         }
  275.                     };
  276.                 }
  277.             });
  278.  
  279.             int taskCount = 0;
  280.             CompletionService<Void> cs = new ExecutorCompletionService<Void>(exec);
  281.             for (Runnable task : tasks) {
  282.                 cs.submit(task, (Void) null);
  283.                 ++taskCount;
  284.             }
  285.  
  286.             if (timeout < 0 || timeoutUnit == null) {
  287.                 while (taskCount > 0) {
  288.                     cs.take();
  289.                     --taskCount;
  290.                 }
  291.             } else {
  292.                 long timeoutNanos = timeoutUnit.toNanos(timeout);
  293.                 long limit = System.nanoTime() + timeoutNanos;
  294.                 while (taskCount > 0 && timeoutNanos >= 0) {
  295.                     if (cs.poll(timeoutNanos, TimeUnit.NANOSECONDS) != null) {
  296.                         --taskCount;
  297.                     }
  298.                     timeoutNanos = limit - System.nanoTime();
  299.                 }
  300.             }
  301.  
  302.             if (exceptions.isEmpty()) {
  303.                 return results;
  304.             } else {
  305.                 throw new PartialResultsException(results, exceptions.keySet());
  306.             }
  307.         }
  308.     }
  309.  
  310.  
  311.     static class Result<K, V> implements Serializable {
  312.         Result(K key, V value) {
  313.             this.key = key;
  314.             this.value = value;
  315.         }
  316.         final K key;
  317.         final V value;
  318.     }
  319.  
  320.     static class Task<K, V> implements Callable<Result<K, V>>, Serializable {
  321.         public Result<K, V> call() {
  322.             return new Result<K, V>(key, function.apply(key));
  323.         }
  324.  
  325.         Task(K key, Function<K, V> function) {
  326.             this.key = key;
  327.             this.function = function;
  328.         }
  329.         private final K key;
  330.         private final Function<K, V> function;
  331.     }
  332.  
  333.  
  334.     /**
  335.      * Default access rather than protected, because this class is not designed
  336.      * for extension in general.
  337.      */
  338.     ConcurrentFunction(Function<K, V> function, ExecutorService exec) {
  339.         this.function = checkNotNull(function, "function");
  340.         this.exec = checkNotNull(exec, "exec");
  341.     }
  342.  
  343.     protected final Function<K, V> function;
  344.     protected final ExecutorService exec;
  345.  
  346.     private static final Function<Object, Object> SELF_KEY = new Function<Object, Object>() {
  347.         public Object apply(Object key) {
  348.             return key;
  349.         }
  350.     };
  351.  
  352.     private static final Function<Object, Object> NO_KEY =  new Function<Object, Object>() {
  353.         public Object apply(Object key) {
  354.             return null;
  355.         }
  356.     };
  357. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement