Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import functools
- class ParsingTimeoutError(Exception):
- """Класс для Exception который вылетает при таймауте"""
- pass
- def timeout(seconds: int):
- """Декоратор для обработки таймаутов. Принимает количество секунд для таймаута"""
- def deco(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- res = [ParsingTimeoutError('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, seconds))]
- def new_func():
- try:
- res[0] = func(*args, **kwargs)
- except ParsingTimeoutError as e:
- res[0] = e
- t = Thread(target=new_func)
- t.daemon = True
- try:
- t.start()
- t.join(seconds)
- except Exception as je:
- print('error starting thread')
- raise je
- ret = res[0]
- if isinstance(ret, BaseException):
- raise ret
- return ret
- return wrapper
- return deco
- # И собственно применение
- @timeout(seconds=120)
- def find_new_posts() -> list:
- """Парсит RSS, сравнивает данные с базой и возвращает список неопубликованных постов"""
- код который может зависнуть во время выполнения
- return res
Add Comment
Please, Sign In to add comment