Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from aiohttp import web
- import aiohttp
- import yarl
- async def proxy_handler(request: web.Request) -> web.Response:
- """
- Check request contains http url in query args:
- /fetch?url=http%3A%2F%2Fexample.com%2F
- and trying to fetch it and return body with http status.
- If url passed without scheme or is invalid raise 400 Bad request.
- On failure raise 502 Bad gateway.
- :param request: aiohttp.web.Request to handle
- :return: aiohttp.web.Response
- """
- url = yarl.URL(request.url).query.get('url')
- url_scheme = yarl.URL(url).scheme if url else None
- error_status = 400
- if not url:
- text, status = 'No url to fetch', error_status
- elif not url_scheme:
- text, status = 'Empty url scheme', error_status
- elif url_scheme != 'http':
- text, status = 'Bad url scheme: {}'.format(yarl.URL(url).scheme), error_status
- else:
- async with request.app['session'] as session:
- response = await session.get(url)
- text, status = await response.text(), response.status
- return web.Response(text=text, status=status)
- async def setup_application(app: web.Application) -> None:
- """
- Setup application routes and aiohttp session for fetching
- :param app: app to apply settings with
- """
- app['session'] = aiohttp.ClientSession()
- app.router.add_get('/fetch', proxy_handler)
- async def teardown_application(app: web.Application) -> None:
- """
- Application with aiohttp session for tearing down
- :param app: app for tearing down
- """
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement