Advertisement
Guest User

Untitled

a guest
Apr 11th, 2021
484
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.33 KB | None | 0 0
  1. def grab_title(url):
  2. """Start the grab title process. Returns a response with a token
  3. which can be used to get the actual title via socketio, once it has
  4. been fetched."""
  5. if config.app.testing:
  6. return jsonify(grab_title_async(current_app, url))
  7. else:
  8. token = "title-" + str(uuid.uuid4())
  9. gevent.spawn(
  10. send_title_grab_async, current_app._get_current_object(), url, token
  11. )
  12. return jsonify(status="deferred", token=token)
  13.  
  14.  
  15. def send_title_grab_async(app, url, token):
  16. """Grab the title from the url and send it to whoever might be waiting
  17. via socketio."""
  18. result = grab_title_async(app, url)
  19. result.update(target=token)
  20. with app.app_context():
  21. send_deferred_event("grab_title", token, result)
  22.  
  23.  
  24. def grab_title_async(app, url):
  25. with app.app_context():
  26. try:
  27. resp, data = safe_request(
  28. url, max_size=500000, mimetypes={"text/html"}, partial_read=True
  29. )
  30.  
  31. # Truncate the HTML so less parsing work will be required.
  32. end_title_pos = data.find(b"</title>")
  33. if end_title_pos == -1:
  34. raise ValueError
  35. data = data[:end_title_pos] + b"</title></head><body></body>"
  36.  
  37. _, options = cgi.parse_header(resp.headers.get("Content-Type", ""))
  38. charset = options.get("charset", "utf-8")
  39. og = BeautifulSoup(data, "lxml", from_encoding=charset)
  40. title = og("title")[0].text
  41. title = title.strip(WHITESPACE)
  42. title = re.sub(" - YouTube$", "", title)
  43. return {"status": "ok", "title": title}
  44. except (
  45. requests.exceptions.RequestException,
  46. ValueError,
  47. OSError,
  48. IndexError,
  49. KeyError,
  50. ):
  51. return {"status": "error"}
  52.  
  53.  
  54. def safe_request(
  55. url, receive_timeout=10, max_size=25000000, mimetypes=None, partial_read=False
  56. ):
  57. """Gets stuff from the internet, with timeouts, content type and size
  58. restrictions. If partial_read is True it will return approximately
  59. the first max_size bytes, otherwise it will raise an error if
  60. max_size is exceeded."""
  61. # Returns (Response, File)
  62. try:
  63. r = requests.get(
  64. url,
  65. stream=True,
  66. timeout=receive_timeout,
  67. headers={"User-Agent": "WhatsApp/2"},
  68. )
  69. except: # noqa
  70. raise ValueError("error fetching")
  71. r.raise_for_status()
  72.  
  73. if int(r.headers.get("Content-Length", 1)) > max_size and not partial_read:
  74. raise ValueError("response too large")
  75.  
  76. if mimetypes is not None:
  77. mtype, _ = cgi.parse_header(r.headers.get("Content-Type", ""))
  78. if mtype not in mimetypes:
  79. raise ValueError("wrong content type")
  80.  
  81. size = 0
  82. start = time.time()
  83. f = b""
  84. for chunk in r.iter_content(1024):
  85. if time.time() - start > receive_timeout:
  86. raise ValueError("timeout reached")
  87. gevent.sleep(0) # Otherwise this loop can block other greenlets for > 0.5s
  88.  
  89. size += len(chunk)
  90. f += chunk
  91. if size > max_size:
  92. if partial_read:
  93. return r, f
  94. else:
  95. raise ValueError("response too large")
  96. return r, f
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement