Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package;
- import neko.vm.Mutex;
- import neko.vm.Thread;
- class PoolThread {
- private var _name:String;
- private var _pool:ThreadPool;
- private var _thread:Thread;
- public function new(name:String, pool:ThreadPool) {
- _name = name;
- _pool = pool;
- _thread = Thread.create(run);
- _thread.sendMessage(Thread.current());
- _thread.sendMessage(_pool);
- Thread.readMessage(true);
- }
- public function assign(task:ThreadTask) {
- _thread.sendMessage(task);
- }
- private var _kill:Bool = false;
- public function kill() {
- _kill = true;
- _thread.sendMessage(null);
- }
- @:access(ThreadPool)
- private function run() {
- var callingThread:Thread = Thread.readMessage(true);
- var pool:ThreadPool = Thread.readMessage(true);
- callingThread.sendMessage(null);
- while (_kill == false) {
- var message = Thread.readMessage(true);
- if (_kill == true) {
- break;
- }
- if (Std.is(message, ThreadTask)) {
- var task:ThreadTask = cast(message, ThreadTask);
- var result = task.task(task.arg);
- pool.threadTaskComplete(this, task, result);
- }
- }
- callingThread.sendMessage(null);
- }
- }
- class ThreadTask {
- public var task:Dynamic->Dynamic;
- public var arg:Dynamic;
- public var onComplete:Dynamic->Void;
- public function new(task:Dynamic->Dynamic, arg:Dynamic, onComplete:Dynamic->Void) {
- this.task = task;
- this.arg = arg;
- this.onComplete = onComplete;
- }
- }
- class ThreadPool {
- private var _threads:Array<PoolThread>;
- private var _threadPool:Array<PoolThread>;
- private var _lock:Mutex;
- private var _tasks:Array<ThreadTask>;
- private var _activeTasks:Array<ThreadTask>;
- public function new(size:Int, namePrefix:String = "thread-") {
- _threads = [];
- _threadPool = [];
- _lock = new Mutex();
- _tasks = [];
- _activeTasks = [];
- for (i in 0...size) {
- var thread:PoolThread = new PoolThread('${namePrefix}${i}', this);
- _threads.push(thread);
- _threadPool.push(thread);
- }
- }
- public function addTask(task:Dynamic->Dynamic, arg:Dynamic, onComplete:Dynamic->Void) {
- _lock.acquire();
- _tasks.push(new ThreadTask(task, arg, onComplete));
- _lock.release();
- checkTasks();
- }
- public function killAllThreads() {
- for (thread in _threads) {
- thread.kill();
- Thread.readMessage(true);
- }
- }
- private function threadTaskComplete(thread:PoolThread, task:ThreadTask, result:Dynamic) {
- _lock.acquire();
- _activeTasks.remove(task);
- _threadPool.push(thread);
- task.onComplete(result);
- _lock.release();
- checkTasks();
- }
- private function checkTasks() {
- _lock.acquire();
- var thread:PoolThread = _threadPool.pop();
- _lock.release();
- if (thread == null) {
- return;
- }
- if (_tasks.length == 0) {
- return;
- }
- _lock.acquire();
- var task = _tasks[0];
- _tasks.remove(task);
- _activeTasks.push(task);
- _lock.release();
- thread.assign(task);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement