Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package async;
- import sys.thread.Thread;
- class Loop {
- static var stack = new Array<StackElement>();
- static final threads = new Array<{thread:Thread, taskContainer:TaskContainer}>();
- static final pendingNotifications = new Array<{thread:Thread, value:Dynamic}>();
- public static var threadCount = 0;
- public static function enque(task:() -> Void, startImmediately = false) {
- if (startImmediately) {
- stack.push(Fiber(Thread.current()));
- stack.push(Task(task));
- startNext();
- awaitWakeup();
- } else {
- stack.push(Task(task));
- }
- }
- public static function onTaskDone() {
- final taskContainer = new TaskContainer();
- threads.push({thread: Thread.current(), taskContainer: taskContainer});
- startNext();
- awaitWakeup();
- return taskContainer.task.ifNull(() -> () -> {});
- }
- public static function pendNotification(thread:Thread, value:Dynamic) {
- pendingNotifications.push({thread: thread, value: value});
- }
- static function startNext():Bool {
- final notification = pendingNotifications.pop();
- return switch notification {
- case null:
- final next = stack.pop();
- switch next {
- case null:
- false;
- case Fiber(x):
- x.sendMessage(0);
- true;
- case Task(x):
- prepareThread(x).sendMessage(0);
- true;
- }
- case {thread: thread, value: value}:
- thread.sendMessage(value);
- true;
- }
- }
- static function prepareThread(task:Void->Void) {
- final pooled = threads.pop();
- final thread = if (pooled == null) {
- createThread(task);
- } else {
- pooled.taskContainer.task = task;
- pooled.thread;
- }
- return thread;
- }
- static function createThread(initialTask:Void->Void) {
- threadCount++;
- return Thread.create(() -> {
- awaitWakeup();
- initialTask();
- while (true) {
- final nextTask = onTaskDone();
- nextTask();
- }
- });
- }
- static function awaitWakeup() {
- if (Thread.readMessage(true) != 0) {
- throw "Invalid wakeup for thread in event loop!";
- }
- }
- public static function awaitValue():Dynamic {
- if (!startNext())
- throw "Awaiting value with empty event loop!";
- return Thread.readMessage(true);
- }
- static function await(next:Thread, message:Dynamic) {
- stack.push(Fiber(Thread.current()));
- next.sendMessage(message);
- awaitWakeup();
- }
- public static function process() {
- while (true) {
- switch (pendingNotifications.pop()) {
- case null:
- switch stack.pop() {
- case null:
- break;
- case Fiber(x):
- await(x, 0);
- case Task(x):
- await(prepareThread(x), 0);
- }
- case {thread: thread, value: value}:
- await(thread, value);
- }
- }
- }
- }
- private class TaskContainer {
- public var task:Null<Void->Void>;
- public function new() {}
- }
- private enum StackElement {
- Fiber(x:Thread);
- Task(x:Void->Void);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement