Advertisement
Guest User

Untitled

a guest
May 24th, 2015
186
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 10.61 KB | None | 0 0
  1. package try2;
  2.  
  3. import java.util.*;
  4. import java.util.concurrent.Callable;
  5. import java.util.concurrent.locks.Lock;
  6. import java.util.concurrent.locks.ReentrantLock;
  7.  
  8. /**
  9.  * Created by MacBook on 22.05.15.
  10.  */
  11. class CalcObj{
  12.     private Result result;
  13.     private Callable<Double> task;
  14.  
  15.     public CalcObj(Result result, Callable<Double> task) {
  16.         this.result = result;
  17.         this.task = task;
  18.     }
  19.  
  20.     public Result getResult() {
  21.         return result;
  22.     }
  23.  
  24.     public Callable<Double> getTask() {
  25.         return task;
  26.     }
  27. }
  28.  
  29. public class PoolManager extends Thread{
  30.     private class PoolThread extends Thread {
  31.         private boolean status, hasTask;
  32.         private Callable<Double> task;
  33.         private Object inWork;
  34.  
  35.         public PoolThread() {
  36.             status = hasTask = false;
  37.             inWork = new Object();
  38.             task = null;
  39.         }
  40.  
  41.         @Override
  42.         public void run() {
  43.             this.status = true;
  44.  
  45.             while (status) {
  46.                 waitForTask();
  47.                 if (!status) break;
  48.                 doTask();
  49.                 prepareToNextTask();
  50.             }
  51.         }
  52.  
  53.         private void prepareToNextTask(){
  54.             hasTask = false;
  55.             task = null;
  56.  
  57.             synchronized (threadsSync){
  58.                 threads.add(this);
  59.                 threadsSync.notifyAll();
  60.             }
  61.         }
  62.  
  63.         private void waitForTask(){
  64.             synchronized (inWork) {
  65.                 while (!hasTask && status) {
  66.                     try {
  67.                         inWork.wait();
  68.                     } catch (InterruptedException e) {
  69.                         e.printStackTrace();
  70.                     }
  71.                 }
  72.             }
  73.         }
  74.  
  75.         private void doTask(){
  76.             try {
  77.                 double taskResult = task.call();
  78.                 threadResultsLock.lock();
  79.                 resultsFromThreads.put(task, taskResult);
  80.             } catch (Exception e) {
  81.                 e.printStackTrace();
  82.             } finally {
  83.                 threadResultsLock.unlock();
  84.             }
  85.         }
  86.  
  87.         public void setTask(Callable<Double> task){
  88.             synchronized (inWork) {
  89.                 if (!hasTask) {
  90.                     this.task = task;
  91.                     hasTask = true;
  92.                     inWork.notify();
  93.                 }
  94.             }
  95.         }
  96.  
  97.         public void kill() {
  98.             this.status = false;
  99.             synchronized (inWork){
  100.                 inWork.notify();
  101.             }
  102.         }
  103.     }
  104.  
  105.     private LinkedList<Callable<Double>> tasks;
  106.     private LinkedList<PoolThread> threads;
  107.     private boolean status, killCalled;
  108.     private int numberOfThreads;
  109.     private int maxNumberOfTasks;
  110.     private Object tasksSync, threadsSync;
  111.     private Lock killLock, resultsLock, threadResultsLock;
  112.     private HashMap<Callable<Double>, Result> resultsToReturn;
  113.     private HashMap<Callable<Double>, Double> resultsFromThreads;
  114.  
  115.     public PoolManager(int numberOfThreads, int maxNumberOfTasks) {
  116.         if (numberOfThreads <= 0 || maxNumberOfTasks <= 0)
  117.             throw new IllegalArgumentException("invalid input");
  118.  
  119.         this.numberOfThreads = numberOfThreads;
  120.         this.maxNumberOfTasks = maxNumberOfTasks;
  121.         this.threads = new LinkedList<PoolThread>();
  122.         this.tasks = new LinkedList<Callable<Double>>();
  123.         this.status = true;
  124.         this.killCalled = false;
  125.         this.tasksSync = new Object();
  126.         this.threadsSync = new Object();
  127.         this.killLock = new ReentrantLock();
  128.         this.resultsLock = new ReentrantLock();
  129.         this.threadResultsLock = new ReentrantLock();
  130.         this.resultsFromThreads = new HashMap<Callable<Double>, Double>();
  131.         this.resultsToReturn = new HashMap<Callable<Double>, Result>();
  132.         this.initThreads();
  133.     }
  134.  
  135.     private void initThreads() {
  136.         for (int i = 0; i < numberOfThreads; i++) {
  137.             PoolThread worker = new PoolThread();
  138.             threads.add(worker);
  139.             worker.start();
  140.         }
  141.     }
  142.  
  143.     public void execute(CalcObj calcObj) {
  144.         if (killCalled)
  145.             throw new IllegalArgumentException("pool thread is dead");
  146.         else {
  147.             synchronized (tasksSync) {
  148.                 tasks.add(calcObj.getTask());
  149.                 tasksSync.notify();
  150.             }
  151.             resultsLock.lock();
  152.             try{
  153.                 resultsToReturn.putIfAbsent(calcObj.getTask(), calcObj.getResult());
  154.             } finally {
  155.                 resultsLock.unlock();
  156.             }
  157.         }
  158.     }
  159.  
  160.     public void kill() {
  161.         killLock.lock();
  162.         killCalled = true;
  163.         try {
  164.             if (status) {
  165.                 boolean readyToKillTasks = false, readyToKillThreads = false;
  166.  
  167.                 while (!readyToKillTasks || !readyToKillThreads) {
  168.                     synchronized (tasksSync) {
  169.                         while (tasks.size() > 0) {
  170.                             try {
  171.                                 tasksSync.wait();
  172.                             } catch (InterruptedException e) {
  173.                                 e.printStackTrace();
  174.                             }
  175.                         }
  176.                         readyToKillTasks = true;
  177.                     }
  178.                     synchronized (threadsSync) {
  179.                         while (threads.size() != numberOfThreads) {
  180.                             try {
  181.                                 threadsSync.wait();
  182.                             } catch (InterruptedException e) {
  183.                                 e.printStackTrace();
  184.                             }
  185.                         }
  186.                         readyToKillThreads = true;
  187.                     }
  188.                     try {
  189.                         Thread.sleep(10);
  190.                     } catch (InterruptedException e) {
  191.                         e.printStackTrace();
  192.                     }
  193.                 }
  194.  
  195.                 this.status = false;
  196.  
  197.                 for (PoolThread thread : threads)
  198.                     thread.kill();
  199.  
  200.                 synchronized (tasksSync) {
  201.                     tasksSync.notify();
  202.                 }
  203.             }
  204.         } finally {
  205.             killLock.unlock();
  206.         }
  207.     }
  208.  
  209.     public boolean isQueueFull() {
  210.         boolean result = false;
  211.  
  212.         synchronized (tasksSync) {
  213.             if (tasks.size() == maxNumberOfTasks)
  214.                 result = true;
  215.         }
  216.  
  217.         return result;
  218.     }
  219.  
  220.     @Override
  221.     public void run() {
  222.         while (status){
  223.             Callable<Double> task = getNextTask();
  224.             if (task == null) break;
  225.  
  226.             PoolThread worker = getFreeThread();
  227.             worker.setTask(task);
  228.             returnResults();
  229.         }
  230.     }
  231.  
  232.     private Callable<Double> getNextTask(){
  233.         Callable<Double> result = null;
  234.  
  235.         synchronized (tasksSync) {
  236.             while (tasks.size() == 0 && status) {
  237.                 try {
  238.                     tasksSync.wait();
  239.                 } catch (InterruptedException e) {
  240.                     e.printStackTrace();
  241.                 }
  242.             }
  243.             if (tasks.size() > 0)
  244.                 result = tasks.removeFirst();
  245.  
  246.             tasksSync.notify();
  247.         }
  248.         return result;
  249.     }
  250.  
  251.     private PoolThread getFreeThread(){
  252.         PoolThread result = null;
  253.  
  254.         synchronized (threadsSync){
  255.             while (threads.size() == 0){
  256.                 try {
  257.                     threadsSync.wait();
  258.                 } catch (InterruptedException e) {
  259.                     e.printStackTrace();
  260.                 }
  261.             }
  262.             result = threads.removeFirst();
  263.         }
  264.         return result;
  265.     }
  266.  
  267.     private void returnResults(){
  268.         threadResultsLock.lock();
  269.         resultsLock.lock();
  270.         try{
  271.             if (resultsFromThreads.size() > 0) {
  272. //                Iterator iterator = resultsFromThreads.entrySet().iterator();
  273. //
  274. //                while (iterator.hasNext()) {
  275. //                    Map.Entry pair = (Map.Entry) iterator.next();
  276. //                    double resFromThread = (Double) pair.getValue();
  277. //                    Callable<Double> key = (Callable<Double>) pair.getKey();
  278. //                    Result resToReturn = resultsToReturn.get(key);
  279. //                    resToReturn.add(resFromThread);
  280. //
  281. //                    iterator.remove();
  282. //                    resultsFromThreads.remove(key, resFromThread);
  283. //                    resultsToReturn.remove(key, resToReturn);
  284. //                }
  285.  
  286.                 Map.Entry[] arr = new Map.Entry[resultsFromThreads.size()];
  287.                         arr = resultsFromThreads.entrySet().toArray(arr);
  288.                 for (int i = 0; i < arr.length; i++) {
  289.                     Map.Entry pair = arr[i];
  290.                     double resFromThread = (Double) pair.getValue();
  291.                     Callable<Double> key = (Callable<Double>) pair.getKey();
  292.                     Result resToReturn = resultsToReturn.get(key);
  293.                     resToReturn.add(resFromThread);
  294.  
  295.                     resultsFromThreads.remove(key, resFromThread);
  296.                     resultsToReturn.remove(key, resToReturn);
  297.                 }
  298.             }
  299.         } finally {
  300.             threadResultsLock.unlock();
  301.             resultsLock.unlock();
  302.         }
  303.     }
  304.  
  305.     public static void main(String[] args) throws InterruptedException {
  306.         for (int j = 0; j < 100; j++) {
  307.  
  308.             PoolManager poolManager = new PoolManager(1, 50);
  309.             poolManager.start();
  310.             List list = new ArrayList();
  311.             Lock lock = new ReentrantLock();
  312.  
  313.             for (int i = 0; i < 50; i++)
  314.                 poolManager.execute(new CalcObj(new Result(), new TAS(i + j + 1, list, lock)));
  315.  
  316.             poolManager.kill();
  317.             poolManager.join();
  318.  
  319.             System.out.println(poolManager.isAlive());
  320. //            Thread.sleep(1000);
  321.             for (int i = 0; i < 1; i++)
  322.                 System.out.println(poolManager.threads.get(i).isAlive());
  323.         }
  324.  
  325.     }
  326. }
  327.  
  328. class TAS implements Callable<Double>{
  329.     double id;
  330.     List list;
  331.     Lock lock;
  332.  
  333.  
  334.     public TAS(int i, List list, Lock lock) {
  335.         id = i;
  336.         this.list = list;
  337.         this.lock = lock;
  338.  
  339.     }
  340.  
  341.     @Override
  342.     public Double call() throws Exception {
  343.         System.out.println("hi from id:" + id + " " + Thread.currentThread().getName());
  344.         lock.lock();
  345.         try {
  346.             list.add(id);
  347.         } finally {
  348.             lock.unlock();
  349.         }
  350. //        Thread.sleep(1000);
  351.  
  352.         return 3.14;
  353.     }
  354. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement