Guest User

Untitled

a guest
Oct 19th, 2018
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.36 KB | None | 0 0
  1. about_check_url = pyqtSignal(str)
  2.  
  3. MyAsyncCheckUrls.about_check_url.emit('{}------{}'.format(url, code))
  4.  
  5. future_to_url = {executor.submit(load_url, url, 60): url for url in MyAsyncCheckUrls.using_urls}
  6.  
  7. class MyAsyncCheckUrls(QThread):
  8. about_check_url = pyqtSignal(str) # Проверка ответов
  9. good_requested_url = pyqtSignal(str) # Запись хороших ответов
  10. bad_requested_url = pyqtSignal(str) # Запись плохих ответов
  11. status_bar_info = pyqtSignal(int) # Контроль прогресс-бара
  12.  
  13. def __init__(self,urls):
  14. super().__init__()
  15. using_urls = urls
  16.  
  17.  
  18.  
  19. def load_url(url):
  20. with requests.get(url, allow_redirects=False) as shit:
  21. return shit.status_code()
  22.  
  23.  
  24. with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  25. future_to_url = {executor.submit(load_url, url, 60): url for url in MyAsyncCheckUrls.using_urls}
  26. for future in concurrent.futures.as_completed(future_to_url):
  27. url = future_to_url[future]
  28. try:
  29. data = future.result()
  30. except Exception as exc:
  31. load_url(url).code = str(exc)
  32. else:
  33. code = load_url(url).shit.status_code
  34. MyAsyncCheckUrls.about_check_url.emit('{}------{}'.format(url, code))
Add Comment
Please, Sign In to add comment