HasteBin0

Branching Generator Module (V3) [using threading]

Apr 28th, 2021 (edited)
154
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.76 KB | None | 0 0
  1. #!/usr/bin/python3
  2. from collections import namedtuple
  3. from random import randint
  4. from statistics import geometric_mean, harmonic_mean, mean, median, median_high, median_low, mode
  5. from threading import Event, Lock, Thread
  6. from typing import *
  7.  
  8. ''' https://pastebin.com/u4mTHfgc '''
  9.  
  10. int_iterable = Iterable[int]
  11. _T = TypeVar('_T1', int, float)
  12. _FXN_T = Callable[[int_iterable], _T]
  13.  
  14.  
  15. class Server:
  16.     _it: int_iterable
  17.     slots: int
  18.     edit_slots: Lock
  19.     element: _T
  20.     available: Event
  21.     zero_slots: Event
  22.     end: bool
  23.  
  24.     def __init__(self, it: int_iterable):
  25.         self._it = it
  26.         self.slots = 0
  27.         self.edit_slots = Lock()
  28.         self.available = Event()
  29.         self.zero_slots = Event()
  30.         self.end = False
  31.  
  32.     def server(self, queue_length: int):
  33.         available = self.available
  34.         zero_slots = self.zero_slots
  35.         for v in self._it:
  36.             self.slots = queue_length
  37.             self.element = v
  38.             zero_slots.clear()
  39.             available.set()
  40.             zero_slots.wait()
  41.         self.slots = queue_length
  42.         self.end = True
  43.         zero_slots.clear()
  44.         available.set()
  45.         zero_slots.wait()
  46.  
  47.     def client(self) -> int_iterable:
  48.         available = self.available
  49.         zero_slots = self.zero_slots
  50.         edit_slots = self.edit_slots
  51.         while True:
  52.             available.wait()
  53.             end = self.end
  54.             if not end:
  55.                 yield self.element
  56.             with edit_slots:
  57.                 self.slots -= 1
  58.                 if self.slots == 0:
  59.                     available.clear()
  60.                     zero_slots.set()
  61.             zero_slots.wait()
  62.             if end:
  63.                 break
  64.  
  65.  
  66. class Slot:
  67.     thread: Thread
  68.     fxn: _FXN_T
  69.     server: Server
  70.     qid: int
  71.     result: Union[Optional[_T], Exception, Tuple[Exception, Exception]]
  72.  
  73.     def __init__(self, fxn: _FXN_T, server: Server, qid: int):
  74.         self.thread = Thread(target = self.run, name = f'BG {id(self)} thread {qid}')
  75.         self.fxn = fxn
  76.         self.server = server
  77.         self.qid = qid
  78.         self.result = None
  79.  
  80.     def run(self):
  81.         client = self.server.client()
  82.         try:
  83.             self.result = self.fxn(client)
  84.         except Exception as e:
  85.             self.result = e
  86.             try:
  87.                 for _ in client:  # one thread breaking won't break it all.
  88.                     pass
  89.             except Exception as f:
  90.                 self.result = e, f
  91.  
  92.  
  93. class BranchedGenerator:
  94.     _server: Server
  95.     _queue: List[Slot]
  96.  
  97.     def __init__(self, it: int_iterable):
  98.         self._server = Server(it)
  99.         self._queue = []
  100.  
  101.     def new(self, fxn: _FXN_T) -> int:
  102.         qid = len(self._queue)
  103.         self._queue.append(Slot(fxn, self._server, qid))
  104.         return qid
  105.  
  106.     def finalize(self):
  107.         queue = self._queue
  108.         for t in queue:
  109.             t.thread.start()
  110.         self._server.server(len(queue))
  111.         for t in queue:
  112.             t.thread.join()
  113.  
  114.     def get(self, qid: int) -> _T:
  115.         return self._queue[qid].result
  116.  
  117.     @classmethod
  118.     def make(cls, it: int_iterable, fxns: Iterable[_FXN_T]) -> Tuple[_T, ...]:
  119.         tmp = cls(it)
  120.         qid_range = max(map(tmp.new, fxns))
  121.         tmp.finalize()
  122.         return tuple((tmp.get(qid)) for qid in range(qid_range + 1))
  123.  
  124.  
  125. seq_stats = namedtuple('seq_stats', ('tuple', 'mean', 'harmonic_mean', 'geometric_mean', 'median', 'median_high', 'median_low', 'mode'))
  126.  
  127.  
  128. def bundle_bg(xs: int_iterable) -> seq_stats:
  129.     tmp = BranchedGenerator(xs)
  130.     # noinspection PyTypeChecker
  131.     ys = seq_stats(
  132.         tmp.new(tuple),
  133.         tmp.new(mean),
  134.         tmp.new(harmonic_mean),
  135.         tmp.new(geometric_mean),
  136.         tmp.new(median),
  137.         tmp.new(median_high),
  138.         tmp.new(median_low),
  139.         tmp.new(mode)
  140.     )
  141.     tmp.finalize()
  142.     return seq_stats(
  143.         tmp.get(ys.tuple),
  144.         tmp.get(ys.mean),
  145.         tmp.get(ys.harmonic_mean),
  146.         tmp.get(ys.geometric_mean),
  147.         tmp.get(ys.median),
  148.         tmp.get(ys.median_high),
  149.         tmp.get(ys.median_low),
  150.         tmp.get(ys.mode)
  151.     )
  152.  
  153.  
  154. def bundle(xs: int_iterable) -> seq_stats:
  155.     return seq_stats(
  156.         tuple(xs),
  157.         mean(xs),
  158.         harmonic_mean(xs),
  159.         geometric_mean(xs),
  160.         median(xs),
  161.         median_high(xs),
  162.         median_low(xs),
  163.         mode(xs)
  164.     )
  165.  
  166.  
  167. def display(v: seq_stats):
  168.     print(f'Statistics of {v.tuple}:\n'
  169.           f'\tMean: {v.mean}\n'
  170.           f'\tHarmonic Mean: {v.harmonic_mean}\n'
  171.           f'\tGeometric Mean: {v.geometric_mean}\n'
  172.           f'\tMedian: {v.median}\n'
  173.           f'\tMedian High: {v.median_high}\n'
  174.           f'\tMedian Low: {v.median_low}\n'
  175.           f'\tMode: {v.mode};')
  176.  
  177.  
  178. def new(length: int, inclusive_maximum: int) -> int_iterable:
  179.     return (randint(1, inclusive_maximum) for _ in range(length))
  180.  
  181.  
  182. def test1() -> int:
  183.     sample = new(10, 1 << 65)
  184.     struct1 = bundle_bg(sample)
  185.     display(struct1)
  186.     struct2 = bundle(struct1.tuple)
  187.     display(struct2)
  188.     matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
  189.     display(matches)
  190.     return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))
  191.  
  192.  
  193. def test2():
  194.     sample = new(1000, 1 << 5)
  195.     struct1 = seq_stats(*BranchedGenerator.make(
  196.         sample,
  197.         (tuple, mean, harmonic_mean, geometric_mean, median, median_high, median_low, mode)
  198.     ))
  199.     display(struct1)
  200.     struct2 = bundle(struct1.tuple)
  201.     display(struct2)
  202.     matches = seq_stats(*(a == b for (a, b) in zip(struct1, struct2)))
  203.     display(matches)
  204.     return sum(((1 >> i) * (not e)) for (i, e) in enumerate(matches))
  205.  
  206.  
  207. def test3():
  208.     pass
  209.  
  210.  
  211. if __name__ == '__main__':
  212.     exit((test2()))
  213.  
Add Comment
Please, Sign In to add comment