Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import heapq
- class StopSim(BaseException):
- """Raised to stop the simulation"""
- class SimEventLoop(asyncio.AbstractEventLoop):
- def __init__(self):
- self._time = 0
- self._scheduled = []
- self._ready = []
- self._debug = False
- def get_debug(self):
- return self._debug
- def create_task(self, coro):
- task = asyncio.Task(coro, loop=self)
- return task
- def time(self):
- return self._time
- def call_soon(self, callback, *args):
- # print('call soon', callback)
- handle = asyncio.Handle(callback, args, self)
- handle._run()
- def call_later(self, delay, callback, *args):
- # print('call_later', delay, callback)
- timer = self.call_at(self.time() + delay, callback, *args)
- return timer
- def call_at(self, when, callback, *args):
- # print('call_at', when, callback)
- timer = asyncio.TimerHandle(when, callback, args, self)
- heapq.heappush(self._scheduled, timer)
- timer._scheduled = True
- return timer
- def _timer_handle_cancelled(self, handle):
- # print('hc')
- pass
- def _run_once(self):
- # print(self.time(), self._scheduled)
- while self._scheduled:
- handle = self._scheduled[0]
- if handle._when > self._time:
- break
- handle = heapq.heappop(self._scheduled)
- handle._scheduled = False
- handle._run()
- self._time += 1
- def run_forever(self):
- while True:
- try:
- self._run_once()
- except StopSim:
- break
- def run_until_complete(self, future):
- new_task = not isinstance(future, asyncio.Future)
- future = asyncio.ensure_future(future, loop=self)
- if new_task:
- future._log_destroy_pending = False
- def lol(fut):
- raise StopSim
- # print('done')
- future.add_done_callback(lol)
- try:
- self.run_forever()
- except:
- if new_task and future.done() and not future.cancelled():
- print('canc')
- future.exception()
- raise
- future.remove_done_callback(lol)
- return future.result()
- def delay(time, result=None, *, loop=None):
- async def _delay():
- """Coroutine that completes after a given time (in seconds)."""
- # based on asyncio.sleep
- future = asyncio.Future(loop=loop)
- h = future._loop.call_later(time,
- future._set_result_unless_cancelled, result)
- try:
- return (await future)
- finally:
- h.cancel()
- return _delay
- def now(loop=None):
- if loop is None:
- loop = asyncio.get_event_loop()
- return loop.time()
- def always(cond):
- def decorator(func):
- async def wrapper():
- while True:
- await cond()
- func()
- return wrapper
- return decorator
- @always(delay(3))
- def hi():
- t = asyncio.get_event_loop().time()
- print(now(), 'hi')
- def main():
- loop = SimEventLoop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(hi())
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement