Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import queue
- import time
- import threading
- from listeners.tick import GameThread
- from listeners import OnTick
- class WorkerThread(GameThread):
- result_queue = queue.Queue()
- def __init__(self):
- self._stop_thread = threading.Event()
- self.queue = queue.Queue()
- super().__init__()
- def stop(self):
- self._stop_thread.set()
- self.join()
- def run(self):
- while not self._stop_thread.is_set():
- if self.queue.empty():
- time.sleep(0.1)
- continue
- exec_func, args, kwargs, result_handler = self.queue.get_nowait()
- result = exec_func(*args, **kwargs)
- self.queue.task_done()
- if result_handler is not None:
- self.result_queue.put_nowait((result_handler, result))
- def add_job(self, exec_func, args=(), kwargs=None, result_handler=None):
- self.queue.put_nowait(
- (exec_func,
- args, {} if kwargs is None else kwargs,
- result_handler))
- # This will ensure that we are executing the result_handler in the main
- # thread again.
- @OnTick
- def on_tick():
- result_queue = WorkerThread.result_queue
- if result_queue.empty():
- return
- result_handler, result = result_queue.get_nowait()
- result_handler(result)
- result_queue.task_done()
- # Test code
- worker = WorkerThread()
- worker.start()
- from commands.typed import TypedServerCommand
- @TypedServerCommand('sleep')
- def on_sleep(info, duration:float):
- worker.add_job(
- lambda: time.sleep(duration),
- result_handler=lambda x: print('SLEEPT', duration))
- def unload()
- worker.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement