Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- from asyncio import Task, create_task, run, wait
- from collections import deque, namedtuple
- from random import randint
- from statistics import geometric_mean, harmonic_mean, mean, median, median_high, median_low, mode
- from typing import *
- ''' https://pastebin.com/ELzEaSK8 '''
- int_iterable = Iterable[int]
- _T = TypeVar('_T1', int, float)
- ENGINE_T = AsyncGenerator[Tuple[_T, bool], int]
- async def injector(engine: ENGINE_T, qid: int) -> AsyncIterator[int]:
- while True:
- try:
- x, try_again = await engine.asend(qid)
- except StopAsyncIteration:
- break
- if try_again:
- continue
- yield x
- WRAPPER_FXN_T = Callable[[int_iterable], _T]
- def wrapper(fxn: WRAPPER_FXN_T, engine: ENGINE_T, qid: int):
- async def i():
- # TypeError: 'async_generator' object is not iterable
- return fxn(iter(x async for x in injector(engine, qid)))
- return i
- class BranchedGenerator:
- _it: int_iterable
- _engine: ENGINE_T
- _queue: Union[tuple, deque]
- def __init__(self, it: int_iterable):
- self._it = it
- self._engine = self._make_engine()
- # noinspection PyTypeChecker
- wait(self._engine)
- self._queue = deque()
- async def _make_engine(self) -> ENGINE_T: # it's like a server
- lq = len(self._queue)
- result = try_again = 0, True
- for value in self._it:
- waiting = set(range(lq))
- while True:
- qid = (yield result)
- if len(waiting) == 0:
- result = try_again
- break
- if qid in waiting:
- waiting.remove(qid)
- result = value, False
- else:
- result = try_again
- def new(self, fxn: WRAPPER_FXN_T) -> int:
- qid = len(self._queue)
- self._queue.append(wrapper(fxn, self._engine, qid)())
- return qid
- def finalize(self):
- self._queue = tuple(self._queue)
- def get(self, qid: int) -> Task:
- return create_task(self._queue[qid])
- @staticmethod
- @(lambda f: (lambda it, fxns: run(f(it, fxns))))
- def make(it: int_iterable, fxns: Iterable[Callable[[int_iterable], _T]]) -> Tuple[_T, ...]:
- tmp = BranchedGenerator(it)
- qid_range = max(map(tmp.new, fxns))
- tmp.finalize()
- return tuple((await tmp.get(qid)) for qid in range(qid_range + 1))
- seq_stats = namedtuple('seq_stats', ('tuple', 'mean', 'harmonic_mean', 'geometric_mean', 'median', 'median_high', 'median_low', 'mode'))
- @(lambda f: (lambda xs: run(f(xs))))
- async def bundle_bg(xs: int_iterable) -> seq_stats:
- tmp = BranchedGenerator(xs)
- # noinspection PyTypeChecker
- ys = seq_stats(
- tmp.new(tuple),
- tmp.new(mean),
- tmp.new(harmonic_mean),
- tmp.new(geometric_mean),
- tmp.new(median),
- tmp.new(median_high),
- tmp.new(median_low),
- tmp.new(mode)
- )
- tmp.finalize()
- return seq_stats(
- await tmp.get(ys.tuple),
- await tmp.get(ys.mean),
- await tmp.get(ys.harmonic_mean),
- await tmp.get(ys.geometric_mean),
- await tmp.get(ys.median),
- await tmp.get(ys.median_high),
- await tmp.get(ys.median_low),
- await tmp.get(ys.mode)
- )
- def bundle(xs: int_iterable) -> seq_stats:
- return seq_stats(
- tuple(xs),
- mean(xs),
- harmonic_mean(xs),
- geometric_mean(xs),
- median(xs),
- median_high(xs),
- median_low(xs),
- mode(xs)
- )
- def display(v: seq_stats):
- print(f'Statistics of {v.tuple}:\n'
- f'\tMean: {v.mean}\n'
- f'\tHarmonic Mean: {v.harmonic_mean}\n'
- f'\tGeometric Mean: {v.geometric_mean}\n'
- f'\tMedian: {v.median}\n'
- f'\tMedian High: {v.median_high}\n'
- f'\tMedian Low: {v.median_low}\n'
- f'\tMode: {v.mode};')
- def new(length: int, inclusive_maximum: int) -> int_iterable:
- return (randint(1, inclusive_maximum) for _ in range(length))
- def test1() -> int:
- sample = new(10, 1 << 65)
- struct1 = bundle_bg(sample)
- display(struct1)
- struct2 = bundle(struct1.tuple)
- display(struct2)
- matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
- display(matches)
- return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))
- @(lambda f: (lambda: run(f())))
- async def test2():
- sample = new(1000, 1 << 5)
- # noinspection PyTypeChecker
- struct1 = seq_stats(*await BranchedGenerator.make(sample, (tuple, mean, harmonic_mean, geometric_mean, median, median_high, median_low, mode)))
- display(struct1)
- struct2 = bundle(struct1.tuple)
- display(struct2)
- matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
- display(matches)
- return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))
- @(lambda f: (lambda: run(f())))
- async def test3():
- pass
- if __name__ == '__main__':
- exit((test1()))
Advertisement
Add Comment
Please, Sign In to add comment