Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- from collections import namedtuple
- from random import randint
- from statistics import geometric_mean, harmonic_mean, mean, median, median_high, median_low, mode
- from threading import Event, Lock, Thread
- from typing import *
- ''' https://pastebin.com/u4mTHfgc '''
- int_iterable = Iterable[int]
- _T = TypeVar('_T1', int, float)
- _FXN_T = Callable[[int_iterable], _T]
- class Server:
- _it: int_iterable
- slots: int
- edit_slots: Lock
- element: _T
- available: Event
- zero_slots: Event
- end: bool
- def __init__(self, it: int_iterable):
- self._it = it
- self.slots = 0
- self.edit_slots = Lock()
- self.available = Event()
- self.zero_slots = Event()
- self.end = False
- def server(self, queue_length: int):
- available = self.available
- zero_slots = self.zero_slots
- for v in self._it:
- self.slots = queue_length
- self.element = v
- zero_slots.clear()
- available.set()
- zero_slots.wait()
- self.slots = queue_length
- self.end = True
- zero_slots.clear()
- available.set()
- zero_slots.wait()
- def client(self) -> int_iterable:
- available = self.available
- zero_slots = self.zero_slots
- edit_slots = self.edit_slots
- while True:
- available.wait()
- end = self.end
- if not end:
- yield self.element
- with edit_slots:
- self.slots -= 1
- if self.slots == 0:
- available.clear()
- zero_slots.set()
- zero_slots.wait()
- if end:
- break
- class Slot:
- thread: Thread
- fxn: _FXN_T
- server: Server
- qid: int
- result: Union[Optional[_T], Exception, Tuple[Exception, Exception]]
- def __init__(self, fxn: _FXN_T, server: Server, qid: int):
- self.thread = Thread(target = self.run, name = f'BG {id(self)} thread {qid}')
- self.fxn = fxn
- self.server = server
- self.qid = qid
- self.result = None
- def run(self):
- client = self.server.client()
- try:
- self.result = self.fxn(client)
- except Exception as e:
- self.result = e
- try:
- for _ in client: # one thread breaking won't break it all.
- pass
- except Exception as f:
- self.result = e, f
- class BranchedGenerator:
- _server: Server
- _queue: List[Slot]
- def __init__(self, it: int_iterable):
- self._server = Server(it)
- self._queue = []
- def new(self, fxn: _FXN_T) -> int:
- qid = len(self._queue)
- self._queue.append(Slot(fxn, self._server, qid))
- return qid
- def finalize(self):
- queue = self._queue
- for t in queue:
- t.thread.start()
- self._server.server(len(queue))
- for t in queue:
- t.thread.join()
- def get(self, qid: int) -> _T:
- return self._queue[qid].result
- @classmethod
- def make(cls, it: int_iterable, fxns: Iterable[_FXN_T]) -> Tuple[_T, ...]:
- tmp = cls(it)
- qid_range = max(map(tmp.new, fxns))
- tmp.finalize()
- return tuple((tmp.get(qid)) for qid in range(qid_range + 1))
- seq_stats = namedtuple('seq_stats', ('tuple', 'mean', 'harmonic_mean', 'geometric_mean', 'median', 'median_high', 'median_low', 'mode'))
- def bundle_bg(xs: int_iterable) -> seq_stats:
- tmp = BranchedGenerator(xs)
- # noinspection PyTypeChecker
- ys = seq_stats(
- tmp.new(tuple),
- tmp.new(mean),
- tmp.new(harmonic_mean),
- tmp.new(geometric_mean),
- tmp.new(median),
- tmp.new(median_high),
- tmp.new(median_low),
- tmp.new(mode)
- )
- tmp.finalize()
- return seq_stats(
- tmp.get(ys.tuple),
- tmp.get(ys.mean),
- tmp.get(ys.harmonic_mean),
- tmp.get(ys.geometric_mean),
- tmp.get(ys.median),
- tmp.get(ys.median_high),
- tmp.get(ys.median_low),
- tmp.get(ys.mode)
- )
- def bundle(xs: int_iterable) -> seq_stats:
- return seq_stats(
- tuple(xs),
- mean(xs),
- harmonic_mean(xs),
- geometric_mean(xs),
- median(xs),
- median_high(xs),
- median_low(xs),
- mode(xs)
- )
- def display(v: seq_stats):
- print(f'Statistics of {v.tuple}:\n'
- f'\tMean: {v.mean}\n'
- f'\tHarmonic Mean: {v.harmonic_mean}\n'
- f'\tGeometric Mean: {v.geometric_mean}\n'
- f'\tMedian: {v.median}\n'
- f'\tMedian High: {v.median_high}\n'
- f'\tMedian Low: {v.median_low}\n'
- f'\tMode: {v.mode};')
- def new(length: int, inclusive_maximum: int) -> int_iterable:
- return (randint(1, inclusive_maximum) for _ in range(length))
- def test1() -> int:
- sample = new(10, 1 << 65)
- struct1 = bundle_bg(sample)
- display(struct1)
- struct2 = bundle(struct1.tuple)
- display(struct2)
- matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
- display(matches)
- return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))
- def test2():
- sample = new(1000, 1 << 5)
- struct1 = seq_stats(*BranchedGenerator.make(
- sample,
- (tuple, mean, harmonic_mean, geometric_mean, median, median_high, median_low, mode)
- ))
- display(struct1)
- struct2 = bundle(struct1.tuple)
- display(struct2)
- matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
- display(matches)
- return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))
- def test3():
- pass
- if __name__ == '__main__':
- exit((test2()))
Add Comment
Please, Sign In to add comment