Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- about_check_url = pyqtSignal(str)
- MyAsyncCheckUrls.about_check_url.emit('{}------{}'.format(url, code))
- future_to_url = {executor.submit(load_url, url, 60): url for url in MyAsyncCheckUrls.using_urls}
- class MyAsyncCheckUrls(QThread):
- about_check_url = pyqtSignal(str) # Проверка ответов
- good_requested_url = pyqtSignal(str) # Запись хороших ответов
- bad_requested_url = pyqtSignal(str) # Запись плохих ответов
- status_bar_info = pyqtSignal(int) # Контроль прогресс-бара
- def __init__(self,urls):
- super().__init__()
- using_urls = urls
- def load_url(url):
- with requests.get(url, allow_redirects=False) as shit:
- return shit.status_code()
- with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
- future_to_url = {executor.submit(load_url, url, 60): url for url in MyAsyncCheckUrls.using_urls}
- for future in concurrent.futures.as_completed(future_to_url):
- url = future_to_url[future]
- try:
- data = future.result()
- except Exception as exc:
- load_url(url).code = str(exc)
- else:
- code = load_url(url).shit.status_code
- MyAsyncCheckUrls.about_check_url.emit('{}------{}'.format(url, code))
Add Comment
Please, Sign In to add comment