Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- from collections import deque
- from itertools import count
- from typing import Deque
- from simple_pipe import *
- class SillyDemoPrimeTest:
- _primes: Deque
- _p_slice: List[int]
- _counter: Iterable[int]
- def __init__(self):
- self._primes = deque((2, 3, 5, 7)) # head start
- self._p_slice = [0] * 20
- self._counter = count(8)
- def print_next20(self) -> str:
- ps = self._primes
- p_slice = self._p_slice
- counter = self._counter
- for i in range(20):
- for n in counter:
- if 0 not in (n % j for j in ps):
- ps.append(n)
- p_slice[i] = n
- break
- return str(p_slice)
- ppc1 = ProcessPairController()
- ppc2 = ProcessPairController()
- def prime_test(sender: bool):
- test = SillyDemoPrimeTest()
- wait_progress_marker = (GeneratePPCExchange.send_sync if sender else GeneratePPCExchange.receive_sync)(ppc1, ppc2)
- pab = ('Process AAA' if sender else 'Process BBB')
- for i in range(100):
- wait_progress_marker()
- print(pab, i)
- output = test.print_next20()
- wait_progress_marker()
- print(pab, output)
- threadS = Process(target = prime_test, name = "AAA", args = (True,))
- threadR = Process(target = prime_test, name = "BBB", args = (False,))
- threadS.start()
- threadR.start()
- threadS.join()
- threadR.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement