Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Render HTML for scraping"""
- # -*- coding: utf-8 -*-
- import os
- import sys
- from contextlib import contextmanager
- from multiprocessing import Pool
- import lxml.html
- try:
- TimeoutError
- except NameError:
- from multiprocessing import TimeoutError # Python 2
- def _render(source_html):
- """Return rendered HTML."""
- from PyQt5.QtCore import QEventLoop, QUrl
- from PyQt5.QtWebEngineWidgets import QWebEngineView
- from PyQt5.QtWidgets import QApplication
- class Render(QWebEngineView):
- """Render HTML with PyQt5 WebEngine."""
- def __init__(self, url):
- self.html = None
- self.app = QApplication(sys.argv)
- QWebEngineView.__init__(self)
- self.loadFinished.connect(self._loadFinished)
- self.loadFinished.connect(self._events_in_html)
- self.load(QUrl(url))
- self.show()
- while not self._events_in_html():
- self.app.processEvents(
- QEventLoop.ExcludeUserInputEvents |
- QEventLoop.ExcludeSocketNotifiers |
- QEventLoop.WaitForMoreEvents)
- self.app.quit()
- def _events_in_html(self):
- if self.html is None:
- return False
- #print(len(lxml.html.fromstring(html).xpath('//*[@class="event"]')))
- elif len(lxml.html.fromstring(self.html).xpath('//*//div[@class="events-result-set"]/*'))>0:
- #if len(lxml.html.fromstring(html).xpath('//*[@class="events-result-set"]/*[@class="event"]'))>0:
- return True
- else:
- return False
- def _callable(self, data):
- self.html = data
- def _loadFinished(self, result):
- self.page().toHtml(self._callable)
- #with devnull():
- return Render(source_html).html
- @contextmanager
- def devnull():
- """Temporarily redirect stdout and stderr to /dev/null."""
- try:
- original_stderr = os.dup(sys.stderr.fileno())
- original_stdout = os.dup(sys.stdout.fileno())
- null = open(os.devnull, 'w')
- os.dup2(null.fileno(), sys.stderr.fileno())
- os.dup2(null.fileno(), sys.stdout.fileno())
- yield
- finally:
- if original_stderr is not None:
- os.dup2(original_stderr, sys.stderr.fileno())
- if original_stdout is not None:
- os.dup2(original_stdout, sys.stdout.fileno())
- if null is not None:
- null.close()
- def render(html):
- """Perform render in a new process to prevent hangs."""
- tries = 3
- for _ in range(tries):
- pool = Pool(1)
- try:
- return pool.apply_async(_render, args=(html,)).get(timeout=120)
- except TimeoutError:
- continue
- finally:
- pool.terminate()
- raise TimeoutError('Timed out attempting to render HTML %d times' % tries)
Add Comment
Please, Sign In to add comment