Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import time
- from typing import Any, Coroutine, Callable, TypeAlias
- T: TypeAlias = Callable[..., Coroutine[Any, Any, Any]]
- class Preprocessor:
- def __init__(self):
- self.stopping_point_for_all_decorators = True
- @staticmethod
- def launch_frequency(interval_seconds: int) -> Any:
- def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
- def wrapper(*args, **kwargs) -> Any:
- self = args[0]
- while self.stopping_point_for_all_decorators:
- func(*args, **kwargs)
- time.sleep(interval_seconds)
- return wrapper
- return decorator
- @staticmethod
- def async_launch_frequency(interval_seconds: int) -> Callable[[T], T]:
- def decorator(func: T) -> T:
- async def wrapper(*args: Any, **kwargs: Any) -> Any:
- self = args[0]
- while self.stopping_point_for_all_decorators:
- await func(*args, **kwargs)
- await asyncio.sleep(interval_seconds)
- return wrapper
- return decorator
- @async_launch_frequency(1)
- def test(self) -> None:
- print('Test')
Advertisement
Add Comment
Please, Sign In to add comment