HasteBin0

Python Simplified Multiprocessing Pipe (V5!)l

Aug 6th, 2020 (edited)
2,666
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 16.51 KB | None | 0 0
  1. #!/usr/bin/python3
  2. from collections import namedtuple
  3. from ctypes import c_bool, c_uint
  4. from io import BytesIO
  5. from itertools import cycle
  6. from multiprocessing import Event, Lock, Pipe, Process, Value
  7. from typing import Callable, Generator, Tuple, Union
  8.  
  9.  
  10. def p_value(v_type, *i_value, lock: Union[Lock, bool] = False) -> Value:
  11.     return Value(v_type, *i_value, lock = lock)
  12.  
  13.  
  14. def event_clear_wait(_ev: Event):
  15.     _ev.clear()
  16.     _ev.wait()
  17.  
  18.  
  19. def event_wait_clear(_ev: Event):
  20.     _ev.wait()
  21.     _ev.clear()
  22.  
  23.  
  24. class ProcessPairController:
  25.     _event_1: Event
  26.     _event_2: Event
  27.  
  28.     def __init__(self):
  29.         self._event_1 = Event()
  30.         self._event_2 = Event()
  31.  
  32.     @property
  33.     def x1(self) -> bool:
  34.         return self._event_1.is_set()
  35.  
  36.     @property
  37.     def x2(self) -> bool:
  38.         return self._event_2.is_set()
  39.  
  40.     def a_clear0wait1(self):
  41.         self._event_2.clear()
  42.         event_clear_wait(self._event_1)
  43.  
  44.     def a_clear0wait2(self):
  45.         self._event_1.clear()
  46.         event_clear_wait(self._event_2)
  47.  
  48.     def a_clear1wait1(self):
  49.         event_clear_wait(self._event_1)
  50.  
  51.     def a_clear2wait2(self):
  52.         event_clear_wait(self._event_2)
  53.  
  54.     def a_clear1wait2(self):
  55.         self._event_1.clear()
  56.         self._event_2.wait()
  57.  
  58.     def a_clear2wait1(self):
  59.         self._event_2.clear()
  60.         self._event_1.wait()
  61.  
  62.     def a_clear1set2(self):
  63.         self._event_1.clear()
  64.         self._event_2.set()
  65.  
  66.     def a_clear2set1(self):
  67.         self._event_2.clear()
  68.         self._event_1.set()
  69.  
  70.     def b_clear1set2wait1(self):
  71.         (e1 := self._event_1).clear()
  72.         self._event_2.set()
  73.         e1.wait()
  74.  
  75.     def b_clear2set1wait2(self):
  76.         (e2 := self._event_2).clear()
  77.         self._event_1.set()
  78.         e2.wait()
  79.  
  80.     def b_set1wait2clear2(self):
  81.         self._event_1.set()
  82.         event_wait_clear(self._event_2)
  83.  
  84.     def b_set2wait1clear1(self):
  85.         self._event_2.set()
  86.         event_wait_clear(self._event_1)
  87.  
  88.     def b_set1wait2clear1(self):
  89.         (e1 := self._event_1).set()
  90.         self._event_2.wait()
  91.         e1.clear()
  92.  
  93.     def b_set2wait1clear2(self):
  94.         (e2 := self._event_2).set()
  95.         self._event_1.wait()
  96.         e2.clear()
  97.  
  98.     def b_wait1clear1set2(self):
  99.         event_wait_clear(self._event_1)
  100.         self._event_2.set()
  101.  
  102.     def b_wait2clear2set1(self):
  103.         event_wait_clear(self._event_2)
  104.         self._event_1.set()
  105.  
  106.     def b_set1wait2(self):
  107.         self._event_1.set()
  108.         self._event_2.wait()
  109.  
  110.     def b_set2wait1(self):
  111.         self._event_2.set()
  112.         self._event_1.wait()
  113.  
  114.     def b_wait1set2(self):
  115.         self._event_1.wait()
  116.         self._event_2.set()
  117.  
  118.     def b_wait2set1(self):
  119.         self._event_2.wait()
  120.         self._event_1.set()
  121.  
  122.     def b_wait1clear1(self):
  123.         event_wait_clear(self._event_1)
  124.  
  125.     def b_wait2clear2(self):
  126.         event_wait_clear(self._event_2)
  127.  
  128.     def clear0(self):
  129.         self._event_1.clear()
  130.         self._event_2.clear()
  131.  
  132.     def clear1only(self):
  133.         self._event_1.clear()
  134.  
  135.     def clear2only(self):
  136.         self._event_2.clear()
  137.  
  138.     def set0(self):
  139.         self._event_1.set()
  140.         self._event_2.set()
  141.  
  142.     def set1only(self):
  143.         self._event_1.set()
  144.  
  145.     def set2only(self):
  146.         self._event_2.set()
  147.  
  148.     def wait0(self):
  149.         self._event_1.wait()
  150.         self._event_2.wait()
  151.  
  152.     def wait1only(self):
  153.         self._event_1.wait()
  154.  
  155.     def wait2only(self):
  156.         self._event_2.wait()
  157.  
  158.  
  159. G_NONE = Generator[None, None, None]
  160. GPE_TYPE = Callable[[], None]
  161.  
  162.  
  163. def gpe_give_next(g: Callable[[ProcessPairController, ProcessPairController], Union[G_NONE, GPE_TYPE]]) -> Callable[[ProcessPairController, ProcessPairController], GPE_TYPE]:
  164.     def inner(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
  165.         xg = g(a, b)
  166.         return lambda: next(xg)
  167.  
  168.     return inner
  169.  
  170.  
  171. class GeneratePPCExchange:
  172.     @staticmethod
  173.     @gpe_give_next
  174.     def send_oscillate(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:  # FIRES FIRST!
  175.         for x in cycle((a, b)):
  176.             x.b_wait1clear1set2()
  177.             yield
  178.  
  179.     @staticmethod
  180.     @gpe_give_next
  181.     def receive_oscillate(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:  # FIRES SECOND!
  182.         for x in cycle((a, b)):
  183.             x.b_set1wait2clear2()
  184.             yield
  185.  
  186.     """
  187.    @staticmethod
  188.    @gpe_give_next
  189.    def _template(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
  190.        pass
  191.    """
  192.  
  193.     @staticmethod
  194.     @gpe_give_next
  195.     def receive_sync(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
  196.         for x in cycle((a, b)):
  197.             x.b_set1wait2clear2()
  198.             yield
  199.  
  200.     @staticmethod
  201.     @gpe_give_next
  202.     def send_sync(a: ProcessPairController, b: ProcessPairController) -> GPE_TYPE:
  203.         for x in cycle((a, b)):
  204.             x.b_set2wait1clear1()
  205.             yield
  206.  
  207.  
  208. PPC_PAIR_T = Tuple[ProcessPairController, ProcessPairController]
  209.  
  210.  
  211. def ppc_pair() -> PPC_PAIR_T:
  212.     return ProcessPairController(), ProcessPairController()
  213.  
  214.  
  215. S_NONE = Union[str, None]
  216. P_RECEIVER_TYPE = Generator[S_NONE, None, None]
  217. CP_SENDER_TYPE = Callable[[S_NONE], bool]
  218.  
  219.  
  220. class ConnectionTicket:
  221.     _lock: Lock
  222.     _id_connection_current: Value
  223.     _id_connection_self: int
  224.  
  225.     def __init__(self):
  226.         self._lock = Lock()
  227.         self._id_connection_current = p_value(c_uint, 0, lock = Lock())
  228.         self._id_connection_self = 0
  229.  
  230.     def __enter__(self):
  231.         self._lock.__enter__()
  232.         self._id_connection_current.value = self._id_connection_self
  233.  
  234.     def __exit__(self, *blah):
  235.         self._lock.__exit__(*blah)
  236.  
  237.     @property
  238.     def seat(self) -> int:
  239.         return self._id_connection_self
  240.  
  241.     @property
  242.     def servicing(self) -> int:
  243.         return self._id_connection_current.value
  244.  
  245.     def next_idc(self) -> int:
  246.         with (idc := self._id_connection_current).get_lock():
  247.             self._id_connection_self = ni = idc.value
  248.             idc.value += 1
  249.             return ni
  250.  
  251.  
  252. sp_error_state = namedtuple('sp_error_state', ('error_memory_sender', 'error_sending_sender', 'error_memory_receiver', 'error_sending_receiver'))
  253.  
  254.  
  255. class SimplePipe:
  256.     _pipe_send: Pipe
  257.     _pipe_receive: Pipe
  258.     _initiate_exchange: PPC_PAIR_T
  259.     _sent_chunk: Event
  260.     _got_chunk: Event
  261.     _sent_a_none: Value
  262.     _last_chunk: Value
  263.     _fast_error: Value
  264.     _e_memory_error_sender: Value
  265.     _e_memory_error_receiver: Value
  266.     _e_sending_failure_sender: Value
  267.     _e_sending_failure_receiver: Value
  268.     _receiver_g: Union[P_RECEIVER_TYPE, None]
  269.     _sender_c: Union[CP_SENDER_TYPE, None]
  270.     _chunk_size_start: int
  271.     _encoding: str
  272.     _connection_ticket: ConnectionTicket
  273.  
  274.     def __init__(self, chunk_size_start: int = 10_000, encoding: str = 'UTF-8'):
  275.         self._pipe_receive, self._pipe_send = Pipe(True)
  276.         self._initiate_exchange = ppc_pair()
  277.         self._sent_chunk = Event()
  278.         self._got_chunk = Event()
  279.         self._sent_a_none = p_value(c_bool, False)
  280.         self._last_chunk = p_value(c_bool, False)
  281.         self._fast_error = p_value(c_bool, False)
  282.         self._e_memory_error_sender = p_value(c_bool, False)
  283.         self._e_memory_error_receiver = p_value(c_bool, False)
  284.         self._e_sending_failure_sender = p_value(c_bool, False)
  285.         self._e_sending_failure_receiver = p_value(c_bool, False)
  286.         self._receiver_g = self._sender_c = None
  287.         self._chunk_size_start = chunk_size_start  # ~32MillionBytes Pipe() limit
  288.         self._encoding = encoding
  289.         self._connection_ticket = ConnectionTicket()  # connection facilitation
  290.  
  291.     def init_sender(self):
  292.         def sender(i: S_NONE) -> bool:
  293.             return sender_g.send(i)
  294.  
  295.         self._sender_c = sender
  296.         next(sender_g := self._send_data())  # I <3 WALRUS OPERATOR!
  297.  
  298.     def init_receiver(self):
  299.         self._receiver_g = rg = self._receive_data()
  300.         next(rg)  # # pre-initialization
  301.  
  302.     def __del__(self):
  303.         for pipe in self._pipe_receive, self._pipe_send:
  304.             if not pipe.closed:
  305.                 pipe.close()
  306.  
  307.     @property
  308.     def receiver(self) -> P_RECEIVER_TYPE:
  309.         return self._receiver_g
  310.  
  311.     @property
  312.     def sender(self) -> CP_SENDER_TYPE:
  313.         return self._sender_c
  314.  
  315.     @property
  316.     def connection_locker(self) -> ConnectionTicket:
  317.         return self._connection_ticket
  318.  
  319.     def set_id(self) -> int:
  320.         return self._connection_ticket.next_idc()
  321.  
  322.     def _send_data(self) -> Generator[bool, S_NONE, None]:  # SENT SENDER SENDS!
  323.         p_send = self._pipe_send
  324.         sent_chunk = self._sent_chunk
  325.         got_chunk = self._got_chunk
  326.         sent_a_none = self._sent_a_none
  327.         last_chunk = self._last_chunk
  328.         fast_error = self._fast_error
  329.         memory_error = self._e_memory_error_sender
  330.         sending_failure = self._e_sending_failure_sender
  331.         start_chunk_size = self._chunk_size_start
  332.         text_encoding = self._encoding
  333.  
  334.         id_ps = get_ps_id(True, False)
  335.         initiate_exchange = GeneratePPCExchange.send_sync(*self._initiate_exchange)
  336.  
  337.         def switch_on_error(trip_this: Exception):
  338.             (memory_error if isinstance(trip_this, MemoryError) else sending_failure).value = fast_error.value = True
  339.  
  340.         while True:  #
  341.             data_to_send = (yield fast_error.value)
  342.             sent_a_none.value = din = data_to_send is None
  343.             initiate_exchange()  # Loop start
  344.             memory_error.value = sending_failure.value = fast_error.value = False  # clear error flags
  345.             last_chunk.value = False
  346.             if din:
  347.                 continue
  348.             b_data = b''
  349.             bd_index = 0
  350.             try:
  351.                 b_data = str.encode(data_to_send, text_encoding)
  352.             except (MemoryError, UnicodeEncodeError) as encoding_error:
  353.                 switch_on_error(encoding_error)
  354.             bd_length = len(b_data)
  355.             initiate_exchange()  # Load string
  356.             if fast_error.value:
  357.                 continue
  358.             chunk_size_attempt = start_chunk_size
  359.             while True:
  360.                 if bd_index + chunk_size_attempt >= bd_length:
  361.                     chunk_size = bd_length - bd_index
  362.                     last_chunk.value = True
  363.                 else:
  364.                     chunk_size = chunk_size_attempt
  365.                 try:
  366.                     p_send.send_bytes(b_data, bd_index, chunk_size)
  367.                     bd_index += chunk_size
  368.                 except (ValueError, MemoryError) as send_error:
  369.                     switch_on_error(send_error)
  370.                 sent_chunk.set()
  371.                 event_wait_clear(got_chunk)
  372.                 if last_chunk.value:
  373.                     break
  374.                 if fast_error.value:
  375.                     if chunk_size_attempt > 1:
  376.                         chunk_size_attempt >>= 1
  377.                     else:
  378.                         last_chunk.value = True
  379.  
  380.     def _receive_data(self) -> P_RECEIVER_TYPE:  # RECEIVED RECEIVER RECEIVES!
  381.         p_receive = self._pipe_receive
  382.         sent_chunk = self._sent_chunk
  383.         got_chunk = self._got_chunk
  384.         sent_a_none = self._sent_a_none
  385.         last_chunk = self._last_chunk
  386.         fast_error = self._fast_error
  387.         memory_error = self._e_memory_error_receiver
  388.         sending_failure = self._e_sending_failure_receiver
  389.         text_encoding = self._encoding
  390.  
  391.         id_ps = get_ps_id(False, False)
  392.         aggregator = BytesIO()
  393.         initiate_exchange = GeneratePPCExchange.receive_sync(*self._initiate_exchange)
  394.  
  395.         def switch_on_error(trip_this: Exception):
  396.             (memory_error if isinstance(trip_this, MemoryError) else sending_failure).value = fast_error.value = True
  397.  
  398.         yield  # pre-initialization
  399.         while True:  #
  400.             initiate_exchange()  # Loop start
  401.             memory_error.value = sending_failure.value = False  # clear error flags
  402.             if sent_a_none.value:
  403.                 yield None
  404.                 continue
  405.             initiate_exchange()  # Load string
  406.             if fast_error.value:
  407.                 continue
  408.             while True:
  409.                 event_wait_clear(sent_chunk)
  410.                 if p_receive.poll():
  411.                     try:
  412.                         aggregator.write(p_receive.recv_bytes())
  413.                     except (MemoryError, EOFError, IOError) as read_error:
  414.                         switch_on_error(read_error)
  415.                 got_chunk.set()
  416.                 if last_chunk.value:
  417.                     break
  418.             try:
  419.                 place = aggregator.tell()
  420.                 aggregator.seek(0)
  421.                 yield aggregator.read(place).decode(text_encoding)
  422.             except (MemoryError, EOFError, IOError, UnicodeDecodeError) as decode_error:
  423.                 switch_on_error(decode_error)
  424.                 yield ''
  425.             aggregator.seek(0)
  426.  
  427.     @property
  428.     def text_encoding(self) -> str:
  429.         return self._encoding
  430.  
  431.     @property
  432.     def memory_error_sender(self) -> bool:
  433.         return self._e_memory_error_sender.value
  434.  
  435.     @property
  436.     def sending_failure_sender(self) -> bool:
  437.         return self._e_sending_failure_sender.value
  438.  
  439.     @property
  440.     def memory_error_receiver(self) -> bool:
  441.         return self._e_memory_error_receiver.value
  442.  
  443.     @property
  444.     def sending_failure_receiver(self) -> bool:
  445.         return self._e_sending_failure_receiver.value
  446.  
  447.     @property
  448.     def error(self) -> bool:
  449.         return any((self._e_memory_error_sender.value, self._e_sending_failure_sender.value,
  450.                     self._e_memory_error_receiver.value, self._e_sending_failure_receiver.value))
  451.  
  452.     @property
  453.     def sending_error(self) -> bool:
  454.         return self._e_memory_error_sender.value or self._e_sending_failure_sender.value
  455.  
  456.     @property
  457.     def receiving_error(self) -> bool:
  458.         return self._e_memory_error_receiver.value or self._e_sending_failure_receiver.value
  459.  
  460.     def ask_error_state(self) -> sp_error_state:
  461.         return sp_error_state(
  462.             self._e_memory_error_sender.value, self._e_sending_failure_sender.value,
  463.             self._e_memory_error_receiver.value, self._e_sending_failure_receiver.value)
  464.  
  465.     def ask_error_state_string(self) -> str:
  466.         return ''.join(('T' if x else 'F') for x in self.ask_error_state())
  467.  
  468.  
  469. print_text_lock = Lock()
  470.  
  471.  
  472. def get_ps_id(sender: bool, tester: bool) -> str:
  473.     return ('Sender.Test:' if tester else 'Sender.Pipe:') if sender else ('Receiver.Test:' if tester else 'Receiver.Pipe:')
  474.  
  475.  
  476. def print_safe(sender_message: str, *args, **kwargs):
  477.     with print_text_lock:
  478.         print(sender_message, *args, **kwargs)
  479.  
  480.  
  481. def get_input(msg: str) -> str:
  482.     with print_text_lock:
  483.         return input(msg)
  484.  
  485.  
  486. def sp_main():
  487.     nexis = SimplePipe()
  488.  
  489.     def f_input():
  490.         nexis.init_sender()
  491.         d_send = nexis.sender
  492.         quit_phrases = ('quit', 'end', 'none')
  493.         id_ps = get_ps_id(True, True)
  494.         while True:
  495.             try:
  496.                 error = d_send(d_in := (None if (d_in := get_input('Message: ')).strip().lower() in quit_phrases else d_in))  # I <3 WALRUS OPERATOR!
  497.             except EOFError:  # input
  498.                 continue
  499.             if error:
  500.                 print_safe(id_ps, f'Error while sending {repr(d_in)} with error state {nexis.ask_error_state_string()}.')
  501.                 continue
  502.             if d_in is None:
  503.                 break
  504.         print_safe(id_ps, 'Session ended.')
  505.  
  506.     def f_print():
  507.         nexis.init_receiver()
  508.         d_receive = nexis.receiver
  509.         id_ps = get_ps_id(False, True)
  510.         print_safe(id_ps, 'Initializing.')
  511.         while True:
  512.             try:
  513.                 print_safe(id_ps, 'Receiving data...')
  514.                 text = next(d_receive)
  515.                 print_safe(id_ps, 'Message compiled.')
  516.                 if nexis.error:
  517.                     print_safe(id_ps, 'Error state', nexis.ask_error_state_string(), 'encountered.')
  518.             except MemoryError:
  519.                 print_safe(id_ps, 'Error rejoining test.')
  520.                 continue
  521.             if text is None:
  522.                 print_safe(id_ps, 'Output = None.')
  523.                 break
  524.             else:
  525.                 print_safe(id_ps, f'Output: "{text}";')
  526.         print_safe(id_ps, 'Session concluded.')
  527.  
  528.     (p_print := Process(target = f_print, name = "P_print")).start()
  529.     f_input()
  530.     p_print.join()
  531.     return
  532.  
  533.  
  534. if __name__ == '__main__':
  535.     sp_main()
  536.  
Add Comment
Please, Sign In to add comment