Advertisement
d_stueken

ForkJoin sample

Jun 3rd, 2013
235
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5 3.05 KB | None | 0 0
  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.Random;
  4. import java.util.concurrent.ForkJoinPool;
  5. import java.util.concurrent.ForkJoinTask;
  6. import java.util.concurrent.RecursiveAction;
  7. import java.util.concurrent.atomic.AtomicLong;
  8.  
  9. /**
  10.  * This is a small sample which mixes forked/joined task
  11.  * with asynchronously forked parallel tasks.
  12.  *
  13.  * The sample work task calculates some random values
  14.  * and increments a result counter just to waste some CPU.
  15.  */
  16. public class FJStalled {
  17.  
  18.     /**
  19.      * Execute given work, possibly split off some part of it.
  20.      * @param work to do.
  21.      */
  22.     void doWork(int work) {
  23.  
  24.         // this causes the problem ...
  25.         if(work>10)
  26.             work -= submitParallelWork(5);
  27.  
  28.         // waste some CPU by calculating many random values
  29.         Random r = new Random(result.get());
  30.  
  31.         double value = 0;
  32.         for(int i=0; i<1000*work; i++)
  33.             value += r.nextDouble();
  34.  
  35.         long increment = Math.round(value);
  36.  
  37.         // just to see some result
  38.         result.addAndGet(increment);
  39.     }
  40.  
  41.     // some asynchronously forked task
  42.     volatile ForkJoinTask<?> parallel = null;
  43.  
  44.     /**
  45.      * Try to fork some parallel chunk of work which is not joined.
  46.      * @param chunk available
  47.      * @return chunk being calculated asynchronously or 0
  48.      */
  49.     int submitParallelWork(int chunk) {
  50.  
  51.         // see if any other parallel work was done
  52.         if((parallel==null || parallel.isDone())) {
  53.             parallel = work(chunk).fork();
  54.             return chunk;
  55.         } else
  56.             return 0;
  57.     }
  58.  
  59.     /**
  60.      * Perform some work and possibly split it into about 10 sub tasks.
  61.      * @param work to do
  62.      * @return a ForkJoinTask to execute the work.
  63.      */
  64.     RecursiveAction work(final int work) {
  65.         return new RecursiveAction() {
  66.             @Override
  67.             protected void compute() {
  68.                 // split into smaller tasks
  69.                 if(work>100) {
  70.                     List<RecursiveAction> subtasks = new ArrayList<>(10);
  71.                     int todo = work;
  72.                     int chunk = work/10;
  73.  
  74.                     while(todo>0) {
  75.                         int count = Math.min(todo, chunk);
  76.                         subtasks.add(work(count));
  77.                         todo -= count;
  78.                     }
  79.  
  80.                     invokeAll(subtasks);
  81.                 } else {
  82.  
  83.                     // execute directly
  84.                     doWork(work);
  85.                 }
  86.             }
  87.         };
  88.     }
  89.  
  90.     // just a dummy value to be modified as a result ....
  91.     final AtomicLong result = new AtomicLong(0);
  92.  
  93.     public static void main(String ... args) {
  94.         final ForkJoinPool pool = new ForkJoinPool();
  95.  
  96.         FJStalled sample = new FJStalled();
  97.  
  98.         pool.invoke(sample.work(1000000));
  99.  
  100.         // wait for any parallel work submitted.
  101.         if(sample.parallel!=null)
  102.             sample.parallel.join();
  103.  
  104.         System.out.format("done: %d\n", sample.result.get());
  105.     }
  106. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement