Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package try2;
- import java.util.*;
- import java.util.concurrent.Callable;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- * Created by MacBook on 22.05.15.
- */
- class CalcObj{
- private Result result;
- private Callable<Double> task;
- public CalcObj(Result result, Callable<Double> task) {
- this.result = result;
- this.task = task;
- }
- public Result getResult() {
- return result;
- }
- public Callable<Double> getTask() {
- return task;
- }
- }
- public class PoolManager extends Thread{
- private class PoolThread extends Thread {
- private boolean status, hasTask;
- private Callable<Double> task;
- private Object inWork;
- public PoolThread() {
- status = hasTask = false;
- inWork = new Object();
- task = null;
- }
- @Override
- public void run() {
- this.status = true;
- while (status) {
- waitForTask();
- if (!status) break;
- doTask();
- prepareToNextTask();
- }
- }
- private void prepareToNextTask(){
- hasTask = false;
- task = null;
- synchronized (threadsSync){
- threads.add(this);
- threadsSync.notifyAll();
- }
- }
- private void waitForTask(){
- synchronized (inWork) {
- while (!hasTask && status) {
- try {
- inWork.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private void doTask(){
- try {
- double taskResult = task.call();
- threadResultsLock.lock();
- resultsFromThreads.put(task, taskResult);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- threadResultsLock.unlock();
- }
- }
- public void setTask(Callable<Double> task){
- synchronized (inWork) {
- if (!hasTask) {
- this.task = task;
- hasTask = true;
- inWork.notify();
- }
- }
- }
- public void kill() {
- this.status = false;
- synchronized (inWork){
- inWork.notify();
- }
- }
- }
- private LinkedList<Callable<Double>> tasks;
- private LinkedList<PoolThread> threads;
- private boolean status, killCalled;
- private int numberOfThreads;
- private int maxNumberOfTasks;
- private Object tasksSync, threadsSync;
- private Lock killLock, resultsLock, threadResultsLock;
- private HashMap<Callable<Double>, Result> resultsToReturn;
- private HashMap<Callable<Double>, Double> resultsFromThreads;
- public PoolManager(int numberOfThreads, int maxNumberOfTasks) {
- if (numberOfThreads <= 0 || maxNumberOfTasks <= 0)
- throw new IllegalArgumentException("invalid input");
- this.numberOfThreads = numberOfThreads;
- this.maxNumberOfTasks = maxNumberOfTasks;
- this.threads = new LinkedList<PoolThread>();
- this.tasks = new LinkedList<Callable<Double>>();
- this.status = true;
- this.killCalled = false;
- this.tasksSync = new Object();
- this.threadsSync = new Object();
- this.killLock = new ReentrantLock();
- this.resultsLock = new ReentrantLock();
- this.threadResultsLock = new ReentrantLock();
- this.resultsFromThreads = new HashMap<Callable<Double>, Double>();
- this.resultsToReturn = new HashMap<Callable<Double>, Result>();
- this.initThreads();
- }
- private void initThreads() {
- for (int i = 0; i < numberOfThreads; i++) {
- PoolThread worker = new PoolThread();
- threads.add(worker);
- worker.start();
- }
- }
- public void execute(CalcObj calcObj) {
- if (killCalled)
- throw new IllegalArgumentException("pool thread is dead");
- else {
- synchronized (tasksSync) {
- tasks.add(calcObj.getTask());
- tasksSync.notify();
- }
- resultsLock.lock();
- try{
- resultsToReturn.putIfAbsent(calcObj.getTask(), calcObj.getResult());
- } finally {
- resultsLock.unlock();
- }
- }
- }
- public void kill() {
- killLock.lock();
- killCalled = true;
- try {
- if (status) {
- boolean readyToKillTasks = false, readyToKillThreads = false;
- while (!readyToKillTasks || !readyToKillThreads) {
- synchronized (tasksSync) {
- while (tasks.size() > 0) {
- try {
- tasksSync.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- readyToKillTasks = true;
- }
- synchronized (threadsSync) {
- while (threads.size() != numberOfThreads) {
- try {
- threadsSync.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- readyToKillThreads = true;
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- this.status = false;
- for (PoolThread thread : threads)
- thread.kill();
- synchronized (tasksSync) {
- tasksSync.notify();
- }
- }
- } finally {
- killLock.unlock();
- }
- }
- public boolean isQueueFull() {
- boolean result = false;
- synchronized (tasksSync) {
- if (tasks.size() == maxNumberOfTasks)
- result = true;
- }
- return result;
- }
- @Override
- public void run() {
- while (status){
- Callable<Double> task = getNextTask();
- if (task == null) break;
- PoolThread worker = getFreeThread();
- worker.setTask(task);
- returnResults();
- }
- }
- private Callable<Double> getNextTask(){
- Callable<Double> result = null;
- synchronized (tasksSync) {
- while (tasks.size() == 0 && status) {
- try {
- tasksSync.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- if (tasks.size() > 0)
- result = tasks.removeFirst();
- tasksSync.notify();
- }
- return result;
- }
- private PoolThread getFreeThread(){
- PoolThread result = null;
- synchronized (threadsSync){
- while (threads.size() == 0){
- try {
- threadsSync.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- result = threads.removeFirst();
- }
- return result;
- }
- private void returnResults(){
- threadResultsLock.lock();
- resultsLock.lock();
- try{
- if (resultsFromThreads.size() > 0) {
- // Iterator iterator = resultsFromThreads.entrySet().iterator();
- //
- // while (iterator.hasNext()) {
- // Map.Entry pair = (Map.Entry) iterator.next();
- // double resFromThread = (Double) pair.getValue();
- // Callable<Double> key = (Callable<Double>) pair.getKey();
- // Result resToReturn = resultsToReturn.get(key);
- // resToReturn.add(resFromThread);
- //
- // iterator.remove();
- // resultsFromThreads.remove(key, resFromThread);
- // resultsToReturn.remove(key, resToReturn);
- // }
- Map.Entry[] arr = new Map.Entry[resultsFromThreads.size()];
- arr = resultsFromThreads.entrySet().toArray(arr);
- for (int i = 0; i < arr.length; i++) {
- Map.Entry pair = arr[i];
- double resFromThread = (Double) pair.getValue();
- Callable<Double> key = (Callable<Double>) pair.getKey();
- Result resToReturn = resultsToReturn.get(key);
- resToReturn.add(resFromThread);
- resultsFromThreads.remove(key, resFromThread);
- resultsToReturn.remove(key, resToReturn);
- }
- }
- } finally {
- threadResultsLock.unlock();
- resultsLock.unlock();
- }
- }
- public static void main(String[] args) throws InterruptedException {
- for (int j = 0; j < 100; j++) {
- PoolManager poolManager = new PoolManager(1, 50);
- poolManager.start();
- List list = new ArrayList();
- Lock lock = new ReentrantLock();
- for (int i = 0; i < 50; i++)
- poolManager.execute(new CalcObj(new Result(), new TAS(i + j + 1, list, lock)));
- poolManager.kill();
- poolManager.join();
- System.out.println(poolManager.isAlive());
- // Thread.sleep(1000);
- for (int i = 0; i < 1; i++)
- System.out.println(poolManager.threads.get(i).isAlive());
- }
- }
- }
- class TAS implements Callable<Double>{
- double id;
- List list;
- Lock lock;
- public TAS(int i, List list, Lock lock) {
- id = i;
- this.list = list;
- this.lock = lock;
- }
- @Override
- public Double call() throws Exception {
- System.out.println("hi from id:" + id + " " + Thread.currentThread().getName());
- lock.lock();
- try {
- list.add(id);
- } finally {
- lock.unlock();
- }
- // Thread.sleep(1000);
- return 3.14;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement