Advertisement
Guest User

Untitled

a guest
Mar 4th, 2015
193
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.32 KB | None | 0 0
  1. # simulate timeout and kill effect on semaphore
  2.  
  3. import multiprocessing
  4. import time
  5. import logging as log
  6. import os
  7. import signal
  8.  
  9. log.basicConfig(level=log.INFO,
  10.                 format='%(processName)s %(process)s: %(message)s')
  11.  
  12.  
  13. def worker(semaphore, work_duration_s, active_worker):
  14.     log.info('Waiting to join the pool')
  15.     with semaphore:
  16.         active_worker.put(multiprocessing.current_process().pid)
  17.         log.info('Lock acquired - working for %.1f seconds' % work_duration_s)
  18.         time.sleep(work_duration_s)
  19.         active_worker.get()
  20.     log.info('Lock released')
  21.  
  22.  
  23. def watchdog(semaphore, active_worker):
  24.     log.info("started")
  25.     polling_interval_s = 0.1
  26.     timeout_s = 2
  27.     while True:
  28.         # get/put is safe because get blocks if queue is empty, and worker gets what he put before releasing lock
  29.         # if queue is empty, this will block until worker puts something into queue
  30.         previous_active_worker = active_worker.get()
  31.         active_worker.put(previous_active_worker)
  32.         log.info('attempting to acquire lock')
  33.         lock_acquired = semaphore.acquire(timeout=timeout_s)
  34.         if not lock_acquired:
  35.             log.info('timeout exceeded')
  36.             current_active_worker = active_worker.get()
  37.             active_worker.put(current_active_worker)
  38.             if current_active_worker == previous_active_worker:
  39.                 log.info('killing: ' + str(previous_active_worker))
  40.                 # kill process and release semaphore
  41.                 try:
  42.                     os.kill(previous_active_worker, signal.SIGINT)
  43.                 except OSError:
  44.                     log.info("failed to kill, process already gone?")
  45.                     pass
  46.                 active_worker.get(1)  # emtpy queue, use timeout in case worker was killed after cleaning up queue
  47.                 semaphore.release()
  48.             else:
  49.                 log.info('locked by different processes - all good')
  50.         else:
  51.             log.info('no lock detected')
  52.             semaphore.release()
  53.         time.sleep(polling_interval_s)
  54.  
  55.  
  56. def start_worker(name, work_duration_s, semaphore, active_worker):
  57.     t = multiprocessing.Process(target=worker, name=name,
  58.                                 args=(semaphore, work_duration_s, active_worker))
  59.     t.start()
  60.     return t.pid
  61.  
  62. def main():
  63.     # shared/global variables
  64.     semaphore = multiprocessing.Lock()
  65.     active_worker = multiprocessing.Queue()  # stores pid which is holding lock
  66.  
  67.     t_watchdog = multiprocessing.Process(target=watchdog, name='WATCHDOG',
  68.                                          args=(semaphore, active_worker))
  69.     t_watchdog.start()
  70.  
  71.     log.info("testing timeout " + "-" * 60)
  72.     for i in range(5):
  73.         start_worker('worker ' + str(i), 0.1, semaphore, active_worker)
  74.     start_worker('SLOW WORKER', 3, semaphore, active_worker)
  75.     for i in range(5, 10):
  76.         start_worker('worker ' + str(i), 0.1, semaphore, active_worker)
  77.  
  78.     time.sleep(10)
  79.     log.info("testing kill " + "-" * 60)
  80.     for i in range(10):
  81.         pid = start_worker('worker ' + str(i), 0.5, semaphore, active_worker)
  82.         time.sleep(0.5)
  83.         log.info("killing: " + str(pid))
  84.         os.kill(pid, signal.SIGINT)
  85.     start_worker('NOT KILLED', 0.1, semaphore, active_worker)
  86.  
  87.  
  88. if __name__ == '__main__':
  89.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement