Advertisement
HasteBin0

simple_pipe_test_gpe_synchronization

Sep 11th, 2020 (edited)
1,746
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.43 KB | None | 0 0
  1. #!/usr/bin/python3
  2. from collections import deque
  3. from itertools import count
  4. from typing import Deque
  5.  
  6. from simple_pipe import *
  7.  
  8.  
  9. class SillyDemoPrimeTest:
  10.     _primes: Deque
  11.     _p_slice: List[int]
  12.     _counter: Iterable[int]
  13.  
  14.     def __init__(self):
  15.         self._primes = deque((2, 3, 5, 7))  # head start
  16.         self._p_slice = [0] * 20
  17.         self._counter = count(8)
  18.  
  19.     def print_next20(self) -> str:
  20.         ps = self._primes
  21.         p_slice = self._p_slice
  22.         counter = self._counter
  23.         for i in range(20):
  24.             for n in counter:
  25.                 if 0 not in (n % j for j in ps):
  26.                     ps.append(n)
  27.                     p_slice[i] = n
  28.                     break
  29.         return str(p_slice)
  30.  
  31.  
  32. ppc1 = ProcessPairController()
  33. ppc2 = ProcessPairController()
  34.  
  35.  
  36. def prime_test(sender: bool):
  37.     test = SillyDemoPrimeTest()
  38.     wait_progress_marker = (GeneratePPCExchange.send_sync if sender else GeneratePPCExchange.receive_sync)(ppc1, ppc2)
  39.     pab = ('Process AAA' if sender else 'Process BBB')
  40.     for i in range(100):
  41.         wait_progress_marker()
  42.         print(pab, i)
  43.         output = test.print_next20()
  44.         wait_progress_marker()
  45.         print(pab, output)
  46.  
  47.  
  48. threadS = Process(target = prime_test, name = "AAA", args = (True,))
  49. threadR = Process(target = prime_test, name = "BBB", args = (False,))
  50. threadS.start()
  51. threadR.start()
  52. threadS.join()
  53. threadR.join()
  54.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement