Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from concurrent.futures import Executor
- from typing import AsyncGenerator, AsyncIterable, Generator, Iterable, TypeVar
- T = TypeVar('T')
- async def iter2asynciter(
- iterable: Iterable[T],
- *,
- loop: asyncio.AbstractEventLoop = None,
- executor: Executor = None) -> AsyncGenerator[T, None]:
- if loop is None:
- loop = asyncio.get_running_loop()
- sentinel = object()
- iterator = iter(iterable)
- future = loop.run_in_executor(executor, next, iterator, sentinel)
- while True:
- v = await future
- if v is sentinel:
- break
- future = loop.run_in_executor(executor, next, iterator, sentinel)
- yield v
- def asynciter2iter(
- asynciterable: AsyncIterable[T],
- *,
- loop: asyncio.AbstractEventLoop) -> Generator[T, None, None]:
- asynciterator = asynciterable.__aiter__()
- future = asyncio.run_coroutine_threadsafe(asynciterator.__anext__(), loop)
- while True:
- try:
- v = future.result()
- except StopAsyncIteration:
- break
- else:
- future = asyncio.run_coroutine_threadsafe(asynciterator.__anext__(), loop)
- yield v
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement