Advertisement
DeaD_EyE

sched with coroutines and timezone aware datetime

Nov 20th, 2023
879
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.07 KB | None | 0 0
  1. import time
  2. import asyncio
  3.  
  4. from datetime import datetime as DateTime
  5. from datetime import timedelta as TimeDelta
  6. from datetime import timezone as TimeZone
  7. from zoneinfo import ZoneInfo
  8. from sched import scheduler
  9.  
  10. UTC = TimeZone.utc
  11.  
  12.  
  13. def timefunc():
  14.     return DateTime.now(ZoneInfo("Europe/Berlin"))
  15.  
  16.  
  17. def delayfunc(delay):
  18.     if isinstance(delay, TimeDelta):
  19.         delay = delay.total_seconds()
  20.     time.sleep(delay)
  21.  
  22.  
  23. def wrap_sync(func, *args1, **kwargs):
  24.     async def run(*args, **kwargs):
  25.         return func(*args, **kwargs)
  26.  
  27.     def wrapped(*args2):
  28.         asyncio.create_task(run(*args1, *args2, **kwargs))
  29.  
  30.     return wrapped
  31.  
  32.  
  33. async def main():
  34.     task_runner = scheduler(timefunc, delayfunc)
  35.     for delay in range(5, 31, 5):
  36.         abs_time = DateTime.now(ZoneInfo("Europe/Berlin")) + TimeDelta(seconds=delay)
  37.         task_runner.enterabs(abs_time, 0, wrap_sync(print, delay, abs_time))
  38.  
  39.     while task_runner.queue:
  40.         task_runner.run(False)
  41.         await asyncio.sleep(0)
  42.  
  43.  
  44. if __name__ == "__main__":
  45.     asyncio.run(main())
  46.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement