SHARE
TWEET

working sentinal example with Thread and Process

DeaD_EyE Aug 4th, 2019 (edited) 72 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from random import random
  2. import time
  3. import threading
  4. import multiprocessing
  5. import queue
  6.  
  7.  
  8. class Sentinel:
  9.     def __init__(self, name):
  10.         self._name = name
  11.     def __eq__(self, other):
  12.         if not hasattr(other, '_name'):
  13.             return False
  14.         return self._name == other._name
  15.     def __repr__(self):
  16.         return f'{self._name} sentinel'
  17.  
  18.  
  19. END_THREAD = Sentinel('END_THREAD')
  20. END_PROCESS = Sentinel('END_PROCESS')
  21. FINISHED = Sentinel('FINISHED')
  22.  
  23.  
  24. def random_sleep():
  25.     t = random()
  26.     time.sleep(t)
  27.  
  28.  
  29. def thread_worker(in_q, out_q):
  30.     for element in iter(in_q.get, END_THREAD):
  31.         random_sleep()
  32.         print(f'Thread Worker put: {element}')
  33.         out_q.put(element)
  34.     print('Thread worker finished')
  35.     out_q.put(FINISHED)
  36.  
  37.  
  38. def process_worker(in_q, out_q):
  39.     for element in iter(in_q.get, END_PROCESS):
  40.         random_sleep()
  41.         print(f'Process Worker put: {element}')
  42.         out_q.put(element)
  43.     print('Process worker finished')
  44.     out_q.put(FINISHED)
  45.  
  46.  
  47. def process_results(in_q):
  48.     for element in iter(in_q.get, FINISHED):
  49.         random_sleep()
  50.         print('Processing results', element)
  51.     print('Processing results finished')
  52.  
  53.  
  54. thread_q = queue.Queue()
  55. process_q = multiprocessing.Queue()
  56. thread_results_q = queue.Queue()
  57. process_results_q = multiprocessing.Queue()
  58.  
  59. # start all threads
  60. threading.Thread(target=thread_worker, args=[thread_q, thread_results_q]).start()
  61. multiprocessing.Process(target=process_worker, args=[process_q, process_results_q]).start()
  62.  
  63. # start two consumers, one as process one as thread
  64. multiprocessing.Process(target=process_results, args=[process_results_q]).start()
  65. threading.Thread(target=process_results, args=[thread_results_q]).start()
  66.  
  67.  
  68. # put tasks in the queue
  69. [thread_q.put(f'Thread Task {n}') for n in range(1,6)]
  70. [process_q.put(f'Process Task {n}') for n in range(10,16)]
  71.  
  72. # thread_q.put(END_PROCESS) # wrong sentinel
  73. # process_q.put(END_THREAD) # wrong sentinel
  74.  
  75. # put stop sentinels into the worker queues
  76. thread_q.put(END_THREAD) # right sentinel
  77. process_q.put(END_PROCESS) # right sentinel
  78.  
  79. # they emit a FINISHED sentinel to the consumer threads
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top