Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def grab_title(url):
- """Start the grab title process. Returns a response with a token
- which can be used to get the actual title via socketio, once it has
- been fetched."""
- if config.app.testing:
- return jsonify(grab_title_async(current_app, url))
- else:
- token = "title-" + str(uuid.uuid4())
- gevent.spawn(
- send_title_grab_async, current_app._get_current_object(), url, token
- )
- return jsonify(status="deferred", token=token)
- def send_title_grab_async(app, url, token):
- """Grab the title from the url and send it to whoever might be waiting
- via socketio."""
- result = grab_title_async(app, url)
- result.update(target=token)
- with app.app_context():
- send_deferred_event("grab_title", token, result)
- def grab_title_async(app, url):
- with app.app_context():
- try:
- resp, data = safe_request(
- url, max_size=500000, mimetypes={"text/html"}, partial_read=True
- )
- # Truncate the HTML so less parsing work will be required.
- end_title_pos = data.find(b"</title>")
- if end_title_pos == -1:
- raise ValueError
- data = data[:end_title_pos] + b"</title></head><body></body>"
- _, options = cgi.parse_header(resp.headers.get("Content-Type", ""))
- charset = options.get("charset", "utf-8")
- og = BeautifulSoup(data, "lxml", from_encoding=charset)
- title = og("title")[0].text
- title = title.strip(WHITESPACE)
- title = re.sub(" - YouTube$", "", title)
- return {"status": "ok", "title": title}
- except (
- requests.exceptions.RequestException,
- ValueError,
- OSError,
- IndexError,
- KeyError,
- ):
- return {"status": "error"}
- def safe_request(
- url, receive_timeout=10, max_size=25000000, mimetypes=None, partial_read=False
- ):
- """Gets stuff from the internet, with timeouts, content type and size
- restrictions. If partial_read is True it will return approximately
- the first max_size bytes, otherwise it will raise an error if
- max_size is exceeded."""
- # Returns (Response, File)
- try:
- r = requests.get(
- url,
- stream=True,
- timeout=receive_timeout,
- headers={"User-Agent": "WhatsApp/2"},
- )
- except: # noqa
- raise ValueError("error fetching")
- r.raise_for_status()
- if int(r.headers.get("Content-Length", 1)) > max_size and not partial_read:
- raise ValueError("response too large")
- if mimetypes is not None:
- mtype, _ = cgi.parse_header(r.headers.get("Content-Type", ""))
- if mtype not in mimetypes:
- raise ValueError("wrong content type")
- size = 0
- start = time.time()
- f = b""
- for chunk in r.iter_content(1024):
- if time.time() - start > receive_timeout:
- raise ValueError("timeout reached")
- gevent.sleep(0) # Otherwise this loop can block other greenlets for > 0.5s
- size += len(chunk)
- f += chunk
- if size > max_size:
- if partial_read:
- return r, f
- else:
- raise ValueError("response too large")
- return r, f
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement