xm4dn355x

timeout decorator

Apr 8th, 2021 (edited)
330
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.51 KB | None | 0 0
  1. import functools
  2.  
  3.  
  4. class ParsingTimeoutError(Exception):
  5.     """Класс для Exception который вылетает при таймауте"""
  6.     pass
  7.  
  8.  
  9. def timeout(seconds: int):
  10.     """Декоратор для обработки таймаутов. Принимает количество секунд для таймаута"""
  11.     def deco(func):
  12.         @functools.wraps(func)
  13.         def wrapper(*args, **kwargs):
  14.             res = [ParsingTimeoutError('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, seconds))]
  15.  
  16.             def new_func():
  17.                 try:
  18.                     res[0] = func(*args, **kwargs)
  19.                 except ParsingTimeoutError as e:
  20.                     res[0] = e
  21.             t = Thread(target=new_func)
  22.             t.daemon = True
  23.             try:
  24.                 t.start()
  25.                 t.join(seconds)
  26.             except Exception as je:
  27.                 print('error starting thread')
  28.                 raise je
  29.             ret = res[0]
  30.             if isinstance(ret, BaseException):
  31.                 raise ret
  32.             return ret
  33.         return wrapper
  34.     return deco
  35.  
  36. # И собственно применение
  37. @timeout(seconds=120)
  38. def find_new_posts() -> list:
  39.     """Парсит RSS, сравнивает данные с базой и возвращает список неопубликованных постов"""
  40.     код который может зависнуть во время выполнения
  41.     return res
Add Comment
Please, Sign In to add comment