Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # simulate timeout and kill effect on semaphore
- import multiprocessing
- import time
- import logging as log
- import os
- import signal
- log.basicConfig(level=log.INFO,
- format='%(processName)s %(process)s: %(message)s')
- def worker(semaphore, work_duration_s, active_worker):
- log.info('Waiting to join the pool')
- with semaphore:
- active_worker.put(multiprocessing.current_process().pid)
- log.info('Lock acquired - working for %.1f seconds' % work_duration_s)
- time.sleep(work_duration_s)
- active_worker.get()
- log.info('Lock released')
- def watchdog(semaphore, active_worker):
- log.info("started")
- polling_interval_s = 0.1
- timeout_s = 2
- while True:
- # get/put is safe because get blocks if queue is empty, and worker gets what he put before releasing lock
- # if queue is empty, this will block until worker puts something into queue
- previous_active_worker = active_worker.get()
- active_worker.put(previous_active_worker)
- log.info('attempting to acquire lock')
- lock_acquired = semaphore.acquire(timeout=timeout_s)
- if not lock_acquired:
- log.info('timeout exceeded')
- current_active_worker = active_worker.get()
- active_worker.put(current_active_worker)
- if current_active_worker == previous_active_worker:
- log.info('killing: ' + str(previous_active_worker))
- # kill process and release semaphore
- try:
- os.kill(previous_active_worker, signal.SIGINT)
- except OSError:
- log.info("failed to kill, process already gone?")
- pass
- active_worker.get(1) # emtpy queue, use timeout in case worker was killed after cleaning up queue
- semaphore.release()
- else:
- log.info('locked by different processes - all good')
- else:
- log.info('no lock detected')
- semaphore.release()
- time.sleep(polling_interval_s)
- def start_worker(name, work_duration_s, semaphore, active_worker):
- t = multiprocessing.Process(target=worker, name=name,
- args=(semaphore, work_duration_s, active_worker))
- t.start()
- return t.pid
- def main():
- # shared/global variables
- semaphore = multiprocessing.Lock()
- active_worker = multiprocessing.Queue() # stores pid which is holding lock
- t_watchdog = multiprocessing.Process(target=watchdog, name='WATCHDOG',
- args=(semaphore, active_worker))
- t_watchdog.start()
- log.info("testing timeout " + "-" * 60)
- for i in range(5):
- start_worker('worker ' + str(i), 0.1, semaphore, active_worker)
- start_worker('SLOW WORKER', 3, semaphore, active_worker)
- for i in range(5, 10):
- start_worker('worker ' + str(i), 0.1, semaphore, active_worker)
- time.sleep(10)
- log.info("testing kill " + "-" * 60)
- for i in range(10):
- pid = start_worker('worker ' + str(i), 0.5, semaphore, active_worker)
- time.sleep(0.5)
- log.info("killing: " + str(pid))
- os.kill(pid, signal.SIGINT)
- start_worker('NOT KILLED', 0.1, semaphore, active_worker)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement