Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package;
- import neko.vm.Mutex;
- import neko.vm.Thread;
- class PoolThread {
- private var _name:String;
- private var _pool:ThreadPool;
- private var _thread:Thread;
- public function new(name:String, pool:ThreadPool) {
- _name = name;
- _pool = pool;
- _thread = Thread.create(run);
- _thread.sendMessage(Thread.current());
- _thread.sendMessage(_pool);
- Thread.readMessage(true);
- }
- public function assign(task:ThreadTask) {
- _thread.sendMessage(task);
- }
- @:access(ThreadPool)
- private function run() {
- trace('${_name} running\n');
- var callingThread:Thread = Thread.readMessage(true);
- var pool:ThreadPool = Thread.readMessage(true);
- callingThread.sendMessage(null);
- while (true) {
- trace('${_name} waiting\n');
- var message = Thread.readMessage(true);
- if (Std.is(message, ThreadTask)) {
- var task:ThreadTask = cast(message, ThreadTask);
- trace('${_name} assigned\n');
- var result = task.task(task.arg);
- trace('${_name} complete - ${result}\n');
- pool.threadTaskComplete(this);
- }
- }
- trace('${_name} ended\n');
- }
- }
- class ThreadTask {
- public var task:Dynamic->Dynamic;
- public var arg:Dynamic;
- public function new(task:Dynamic->Dynamic, arg:Dynamic) {
- this.task = task;
- this.arg = arg;
- }
- }
- class ThreadPool {
- private var _threads:Array<PoolThread>;
- private var _threadPool:Array<PoolThread>;
- private var _lock:Mutex;
- private var _tasks:Array<ThreadTask>;
- public function new(size:Int, namePrefix:String = "thread-") {
- _threads = [];
- _threadPool = [];
- _lock = new Mutex();
- _tasks = [];
- for (i in 0...size) {
- var thread:PoolThread = new PoolThread('${namePrefix}${i}', this);
- _threads.push(thread);
- _threadPool.push(thread);
- }
- }
- public function addTask(task:Dynamic->Dynamic, arg:Dynamic = null) {
- _lock.acquire();
- _tasks.push(new ThreadTask(task, arg));
- _lock.release();
- checkTasks();
- }
- private function threadTaskComplete(thread:PoolThread) {
- _lock.acquire();
- _threadPool.push(thread);
- _lock.release();
- checkTasks();
- }
- private function checkTasks() {
- var thread:PoolThread = _threadPool.pop();
- if (thread == null) {
- return;
- }
- if (_tasks.length == 0) {
- trace("DONE!");
- //Thread.current().sendMessage("all complete");
- return;
- }
- _lock.acquire();
- var task = _tasks[0];
- _tasks.remove(task);
- _lock.release();
- thread.assign(task);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement