Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package test1.tasks;
- import java.util.LinkedList;
- public interface TaskAction<TRet> {
- TRet action() throws InterruptedException;
- }
- public interface ParametrizedTaskAction<TArg, TRet> {
- TRet action(TArg arg) throws InterruptedException;
- }
- public interface Task<TRet> {
- TRet result() throws InterruptedException;
- boolean isCompleted();
- <R> Task<R> continueWith(ParametrizedTaskAction<TRet, R> act);
- }
- public class TaskManager {
- private abstract class TaskImplBase {
- protected final Object _lock = new Object();
- protected boolean _isCompleted;
- protected TaskImplBase _next;
- public TaskImplBase() {
- }
- public TaskImplBase execute() throws InterruptedException {
- this.executeImpl();
- synchronized (_lock) {
- _isCompleted = true;
- _lock.notifyAll();
- return _next;
- }
- }
- protected abstract void executeImpl() throws InterruptedException;
- public boolean isCompleted() {
- synchronized (_lock) {
- return _isCompleted;
- }
- }
- }
- private class TaskImpl<TRet> extends TaskImplBase implements Task<TRet> {
- private final TaskAction<TRet> _action;
- private TRet _result;
- public TaskImpl(TaskAction<TRet> action) {
- _action = action;
- }
- @Override
- protected void executeImpl() throws InterruptedException {
- _result = _action.action();
- }
- @Override
- public TRet result() throws InterruptedException {
- synchronized (_lock) {
- if (_isCompleted)
- return _result;
- _lock.wait();
- return _result;
- }
- }
- @Override
- public <R> Task<R> continueWith(ParametrizedTaskAction<TRet, R> act) {
- synchronized (_lock) {
- TaskAction<R> action = () -> act.action(TaskImpl.this.result());
- if (_isCompleted) {
- return TaskManager.this.start(action);
- } else {
- TaskImpl<R> next = TaskManager.this.new TaskImpl<R>(action);
- _next = next;
- return next;
- }
- }
- }
- }
- private class TaskThread extends Thread {
- private final Object _lock = new Object();
- private TaskImplBase _currTask;
- @Override
- public void run() {
- try {
- do {
- synchronized (_lock) {
- if(_currTask == null)
- _lock.wait();
- }
- _currTask = _currTask.execute();
- } while (_currTask != null);
- TaskManager.this.releaseTaskThread(this);
- } catch (Exception e) {
- // TODO: handle exception
- System.out.println(e.toString());
- }
- }
- public void doWork(TaskImplBase task) {
- synchronized (_lock) {
- _currTask = task;
- _lock.notify();
- }
- }
- }
- private final Object _syncRoot = new Object();
- private final LinkedList<TaskThread> _availableThreads = new LinkedList<>();
- private final LinkedList<TaskThread> _activeThreads = new LinkedList<>();
- public TaskManager() {
- }
- private void releaseTaskThread(TaskThread th) {
- synchronized (_syncRoot) {
- _activeThreads.remove(th);
- _availableThreads.addLast(th);
- }
- }
- public <R> Task<R> start(TaskAction<R> act) {
- synchronized (_syncRoot) {
- TaskImpl<R> task = new TaskImpl<R>(act);
- TaskThread th;
- if (_availableThreads.size() > 0) {
- th = _availableThreads.removeFirst();
- th.doWork(task);
- } else {
- th = new TaskThread();
- th.doWork(task);
- th.start();
- }
- _activeThreads.addLast(th);
- return task;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement