Advertisement
andreymal

https://ru.stackoverflow.com/questions/1525912

Jun 15th, 2023 (edited)
749
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.47 KB | None | 0 0
  1. import asyncio
  2. import logging
  3. from enum import Enum
  4. from typing import Any, AsyncIterable, AsyncIterator, Callable, Generator, Generic, TypeVar
  5.  
  6. T = TypeVar("T")
  7.  
  8. logging.basicConfig(level=logging.DEBUG)
  9. logger = logging.getLogger(__name__)
  10.  
  11.  
  12. class OperationState(Enum):
  13.     CANCELED = -1
  14.     IDLE = 0
  15.     RUNNING = 1
  16.     SUSPENDED = 2
  17.     DONE = 3
  18.  
  19.  
  20. class InvalidOperationStateError(RuntimeError):
  21.     """Raised when an `Operation` enters an invalid state."""
  22.     pass
  23.  
  24.  
  25. class Operation(Generic[T]):
  26.     def __init__(
  27.         self,
  28.         async_iterator: AsyncIterable[T],
  29.         *,
  30.         progress_callback: Callable[[T], Any] | None = None,
  31.         done_callback: Callable[[], Any] | None = None,
  32.     ):
  33.         self._async_iterator = async_iterator
  34.         self._progress_callback = progress_callback
  35.         self._done_callback = done_callback
  36.  
  37.         self._state = OperationState.IDLE
  38.         self._resume_event = asyncio.Event()
  39.         self._runner_task: asyncio.Task[None] | None = None
  40.  
  41.     @property
  42.     def state(self) -> OperationState:
  43.         return self._state
  44.    
  45.     def run(self) -> None:
  46.         if self._state == OperationState.IDLE:
  47.             self._runner_task = asyncio.create_task(self._runner())
  48.             self._state = OperationState.RUNNING
  49.             logger.debug("Operation is runned.")
  50.         else:
  51.             raise InvalidOperationStateError("Operation is already started")
  52.  
  53.     def suspend(self) -> None:
  54.         if self._state == OperationState.RUNNING:
  55.             self._state = OperationState.SUSPENDED
  56.             self._resume_event.clear()
  57.             logger.debug("Operation is suspended.")
  58.         else:
  59.             raise InvalidOperationStateError("Operation is not running")
  60.  
  61.     def resume(self) -> None:
  62.         if self._state == OperationState.SUSPENDED:
  63.             self._state = OperationState.RUNNING
  64.             self._resume_event.set()
  65.             logger.debug("Operation is resumed.")
  66.         else:
  67.             raise InvalidOperationStateError("Operation is not suspended")
  68.  
  69.     def cancel(self) -> None:
  70.         if self._state in (OperationState.RUNNING, OperationState.SUSPENDED):
  71.             if self._runner_task is not None:
  72.                 self._runner_task.cancel()
  73.             self._state = OperationState.CANCELED
  74.             logger.debug("Operation is canceled.")
  75.         else:
  76.             raise InvalidOperationStateError("Operation is not running")
  77.  
  78.     async def wait(self) -> None:
  79.         if self._runner_task is not None:
  80.             await asyncio.wait({self._runner_task})
  81.         else:
  82.             raise InvalidOperationStateError("Operation is not started")
  83.  
  84.     async def _runner(self) -> None:
  85.         async for target in self._async_iterator:
  86.             if self._progress_callback is not None:
  87.                 self._progress_callback(target)
  88.             if self._state == OperationState.SUSPENDED:
  89.                 await self._resume_event.wait()
  90.                 self._resume_event.clear()
  91.  
  92.         self._state = OperationState.DONE
  93.         logger.debug("Operation is done.")
  94.         if self._done_callback is not None:
  95.             self._done_callback()
  96.  
  97.     def __await__(self) -> Generator[Any, None, None]:
  98.         return self.wait().__await__()
  99.  
  100.  
  101. ######
  102.  
  103.  
  104. class ConcreteAsyncGenerator:
  105.     def __init__(self, start: int, stop: int):
  106.         self.current = start
  107.         self.start = start
  108.         self.stop = stop
  109.  
  110.     def __aiter__(self) -> AsyncIterator[int]:
  111.         return self
  112.  
  113.     async def __anext__(self) -> int:
  114.         if self.current < self.stop:
  115.             self.current += 1
  116.             await asyncio.sleep(0.1)
  117.             return self.current
  118.         raise StopAsyncIteration
  119.  
  120.  
  121. def concrete_progress_callback(process: int) -> None:
  122.     # UI, log or something update
  123.     logger.info(f"Iteration complete with: {process}...")
  124.  
  125.  
  126. def concrete_done_callback() -> None:
  127.     # result and errors handling
  128.     logger.info("All iterations complete!")
  129.  
  130.  
  131. async def main() -> None:
  132.     operation = Operation(
  133.         async_iterator=ConcreteAsyncGenerator(0, 100),
  134.         progress_callback=concrete_progress_callback,
  135.         done_callback=concrete_done_callback,
  136.     )
  137.    
  138.     operation.run()
  139.     await asyncio.sleep(1)
  140.     operation.suspend()
  141.     await asyncio.sleep(2)
  142.     operation.resume()
  143.     await asyncio.sleep(1)
  144.     # operation.cancel()
  145.  
  146.     await operation
  147.  
  148.     print("done")
  149.  
  150.  
  151. if __name__ == "__main__":
  152.     asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement