Advertisement
Guest User

Untitled

a guest
Oct 28th, 2020 (edited)
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Haxe 2.83 KB | None | 0 0
  1. package async;
  2.  
  3. import sys.thread.Thread;
  4.  
  5. class Loop {
  6.     static var stack = new Array<StackElement>();
  7.     static final threads = new Array<{thread:Thread, taskContainer:TaskContainer}>();
  8.     static final pendingNotifications = new Array<{thread:Thread, value:Dynamic}>();
  9.     public static var threadCount = 0;
  10.  
  11.     public static function enque(task:() -> Void, startImmediately = false) {
  12.         if (startImmediately) {
  13.             stack.push(Fiber(Thread.current()));
  14.             stack.push(Task(task));
  15.             startNext();
  16.             awaitWakeup();
  17.         } else {
  18.             stack.push(Task(task));
  19.         }
  20.     }
  21.  
  22.     public static function onTaskDone() {
  23.         final taskContainer = new TaskContainer();
  24.         threads.push({thread: Thread.current(), taskContainer: taskContainer});
  25.         startNext();
  26.         awaitWakeup();
  27.         return taskContainer.task.ifNull(() -> () -> {});
  28.     }
  29.  
  30.     public static function pendNotification(thread:Thread, value:Dynamic) {
  31.         pendingNotifications.push({thread: thread, value: value});
  32.     }
  33.  
  34.     static function startNext():Bool {
  35.         final notification = pendingNotifications.pop();
  36.         return switch notification {
  37.             case null:
  38.                 final next = stack.pop();
  39.                 switch next {
  40.                     case null:
  41.                         false;
  42.                     case Fiber(x):
  43.                         x.sendMessage(0);
  44.                         true;
  45.                     case Task(x):
  46.                         prepareThread(x).sendMessage(0);
  47.                         true;
  48.                 }
  49.             case {thread: thread, value: value}:
  50.                 thread.sendMessage(value);
  51.                 true;
  52.         }
  53.     }
  54.  
  55.     static function prepareThread(task:Void->Void) {
  56.         final pooled = threads.pop();
  57.         final thread = if (pooled == null) {
  58.             createThread(task);
  59.         } else {
  60.             pooled.taskContainer.task = task;
  61.             pooled.thread;
  62.         }
  63.         return thread;
  64.     }
  65.  
  66.     static function createThread(initialTask:Void->Void) {
  67.         threadCount++;
  68.         return Thread.create(() -> {
  69.             awaitWakeup();
  70.             initialTask();
  71.             while (true) {
  72.                 final nextTask = onTaskDone();
  73.                 nextTask();
  74.             }
  75.         });
  76.     }
  77.  
  78.     static function awaitWakeup() {
  79.         if (Thread.readMessage(true) != 0) {
  80.             throw "Invalid wakeup for thread in event loop!";
  81.         }
  82.     }
  83.  
  84.     public static function awaitValue():Dynamic {
  85.         if (!startNext())
  86.             throw "Awaiting value with empty event loop!";
  87.         return Thread.readMessage(true);
  88.     }
  89.  
  90.     static function await(next:Thread, message:Dynamic) {
  91.         stack.push(Fiber(Thread.current()));
  92.         next.sendMessage(message);
  93.         awaitWakeup();
  94.     }
  95.  
  96.     public static function process() {
  97.         while (true) {
  98.             switch (pendingNotifications.pop()) {
  99.                 case null:
  100.                     switch stack.pop() {
  101.                         case null:
  102.                             break;
  103.                         case Fiber(x):
  104.                             await(x, 0);
  105.                         case Task(x):
  106.                             await(prepareThread(x), 0);
  107.                     }
  108.                 case {thread: thread, value: value}:
  109.                     await(thread, value);
  110.             }
  111.         }
  112.     }
  113. }
  114.  
  115. private class TaskContainer {
  116.     public var task:Null<Void->Void>;
  117.  
  118.     public function new() {}
  119. }
  120.  
  121. private enum StackElement {
  122.     Fiber(x:Thread);
  123.     Task(x:Void->Void);
  124. }
  125.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement