Advertisement
dig090

Limited Outstanding Tasks

Feb 14th, 2012
413
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.26 KB | None | 0 0
  1. package com.j256;
  2.  
  3. import java.util.ArrayList;
  4. import java.util.Iterator;
  5. import java.util.List;
  6. import java.util.concurrent.Callable;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.Future;
  10.  
  11. /**
  12.  * Class to show how you can process a large number of requests while keeping the memory
  13.  * requirements down.  Right now it sleeps and loops but you could use a notify/wait to
  14.  * speed up the loop a bit.
  15.  */
  16. public class LimitedOutandingTasks {
  17.  
  18.     private static final int NUM_THREADS = 200;
  19.     private static final int NUM_JOBS = 100000;
  20.     private static final int MAX_OUTSTANDING = 1000;
  21.     private static final long SLEEP_BETWEEN_REAPS = 10;
  22.  
  23.     public static void main(String[] args) throws Exception {
  24.         new LimitedOutandingTasks().doMain(args);
  25.     }
  26.  
  27.     private void doMain(String[] args) throws Exception {
  28.  
  29.         ExecutorService pool = Executors.newFixedThreadPool(NUM_THREADS);
  30.         List<Future<Result>> futureList = new ArrayList<Future<Result>>();
  31.  
  32.         int jobC = 0;
  33.         while (true) {
  34.             while (futureList.size() >= MAX_OUTSTANDING) {
  35.                 Thread.sleep(SLEEP_BETWEEN_REAPS);
  36.                 reapFutures(futureList);
  37.             }
  38.             // make our job...
  39.             futureList.add(pool.submit(new OurThread()));
  40.             jobC++;
  41.             if (jobC >= NUM_JOBS) {
  42.                 break;
  43.             }
  44.         }
  45.         System.err.println("Done submitting.");
  46.         while (futureList.size() > 0) {
  47.             Thread.sleep(SLEEP_BETWEEN_REAPS);
  48.             reapFutures(futureList);
  49.         }
  50.         pool.shutdown();
  51.         System.err.println("Done reaping.");
  52.     }
  53.  
  54.     private void reapFutures(List<Future<Result>> futureList) {
  55.         Iterator<Future<Result>> iterator = futureList.iterator();
  56.         int reapC = 0;
  57.         while (iterator.hasNext()) {
  58.             Future<Result> future = iterator.next();
  59.             if (!future.isDone() && !future.isCancelled()) {
  60.                 continue;
  61.             }
  62.             iterator.remove();
  63.             reapC++;
  64.             Result result;
  65.             try {
  66.                 result = future.get();
  67.             } catch (Exception e) {
  68.                 e.printStackTrace();
  69.                 continue;
  70.             }
  71.  
  72.             // process the result
  73.         }
  74.         System.err.println("Reaped " + reapC);
  75.     }
  76.  
  77.     private static class OurThread implements Callable<Result> {
  78.         public Result call() {
  79.             // do something...
  80.             return new Result(/* ... */);
  81.         }
  82.     }
  83.  
  84.     private static class Result {
  85.         public Result() {
  86.         }
  87.     }
  88. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement