jlxip

Tiny async Gentoo distfiles cache proxy

Dec 12th, 2025
5,477
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.84 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. """Tiny async Gentoo distfiles cache proxy"""
  3.  
  4. import asyncio
  5. from pathlib import Path, PurePosixPath
  6.  
  7. import aiohttp
  8. from aiohttp import web
  9.  
  10. ORIGINS = [
  11.     "https://gentoo.mirrors.ovh.net/gentoo-distfiles/",
  12.     "https://distfiles.gentoo.org/releases/amd64/binpackages/23.0/x86-64/",
  13. ]
  14. CACHE_DIR = Path("cache").resolve()
  15. HOST = "0.0.0.0"
  16. PORT = 80
  17. CHUNK_SIZE = 1 << 16
  18.  
  19. CACHE_DIR.mkdir(parents=True, exist_ok=True)
  20. locks: dict[str, asyncio.Lock] = {}
  21. session: aiohttp.ClientSession | None = None
  22.  
  23.  
  24. def clean_relpath(raw: str) -> str:
  25.     rel = PurePosixPath(raw)
  26.     if rel.is_absolute() or ".." in rel.parts or rel.as_posix() == "":
  27.         raise web.HTTPBadRequest(text="invalid path")
  28.     return rel.as_posix()
  29.  
  30.  
  31. async def get_session() -> aiohttp.ClientSession:
  32.     global session
  33.     if session is None:
  34.         timeout = aiohttp.ClientTimeout(total=None, connect=30)
  35.         connector = aiohttp.TCPConnector(limit=0, ttl_dns_cache=300)
  36.         session = aiohttp.ClientSession(timeout=timeout, connector=connector)
  37.     return session
  38.  
  39.  
  40. async def handler(request: web.Request) -> web.StreamResponse:
  41.     if request.method not in ("GET", "HEAD"):
  42.         raise web.HTTPMethodNotAllowed(request.method, ["GET", "HEAD"])
  43.  
  44.     rel = clean_relpath(request.match_info.get("tail", ""))
  45.     local_path = CACHE_DIR / rel
  46.  
  47.     if request.method == "HEAD":
  48.         if local_path.exists():
  49.             return web.FileResponse(local_path)
  50.         return await head_upstream(rel)
  51.  
  52.     if local_path.exists():
  53.         print(f"cached -> {rel}")
  54.         return web.FileResponse(local_path)
  55.  
  56.     return await stream_and_cache(request, rel, local_path)
  57.  
  58.  
  59. async def head_upstream(rel: str) -> web.Response:
  60.     sess = await get_session()
  61.     for origin in ORIGINS:
  62.         url = origin.rstrip("/") + "/" + rel
  63.         async with sess.head(url) as upstream:
  64.             if upstream.status == 404:
  65.                 continue
  66.             if upstream.status >= 400:
  67.                 raise web.HTTPBadGateway(text=f"upstream status {upstream.status}")
  68.             headers = {
  69.                 name: upstream.headers[name]
  70.                 for name in ("Content-Length", "Content-Type", "ETag", "Last-Modified")
  71.                 if name in upstream.headers
  72.             }
  73.             return web.Response(status=upstream.status, headers=headers)
  74.     raise web.HTTPNotFound()
  75.  
  76.  
  77. async def stream_and_cache(
  78.     request: web.Request, rel: str, local_path: Path
  79. ) -> web.StreamResponse:
  80.     sess = await get_session()
  81.     lock = locks.setdefault(rel, asyncio.Lock())
  82.  
  83.     async with lock:
  84.         assert not local_path.exists()
  85.         tmp_path = local_path.with_name(local_path.name + ".part")
  86.         tmp_path.parent.mkdir(parents=True, exist_ok=True)
  87.  
  88.         response = web.StreamResponse()
  89.         client_connected = True
  90.  
  91.         try:
  92.             for origin in ORIGINS:
  93.                 url = origin.rstrip("/") + "/" + rel
  94.                 print(f"cache miss -> {url}")
  95.  
  96.                 async with sess.get(url) as upstream:
  97.                     if upstream.status == 404:
  98.                         continue
  99.                     if upstream.status >= 400:
  100.                         raise web.HTTPBadGateway(
  101.                             text=f"upstream status {upstream.status}"
  102.                         )
  103.  
  104.                     for name in ("Content-Length", "Content-Type"):
  105.                         if name in upstream.headers:
  106.                             response.headers[name] = upstream.headers[name]
  107.  
  108.                     response.set_status(upstream.status)
  109.                     await response.prepare(request)
  110.  
  111.                     with tmp_path.open("wb") as fh:
  112.                         async for chunk in upstream.content.iter_chunked(CHUNK_SIZE):
  113.                             fh.write(chunk)
  114.                             if client_connected:
  115.                                 try:
  116.                                     await response.write(chunk)
  117.                                 except ConnectionResetError:
  118.                                     client_connected = False
  119.  
  120.                     tmp_path.replace(local_path)
  121.                     if client_connected:
  122.                         await response.write_eof()
  123.  
  124.                     print(f"cached {rel}")
  125.                     return response
  126.             raise web.HTTPNotFound()
  127.         except Exception:
  128.             if tmp_path.exists():
  129.                 try:
  130.                     tmp_path.unlink()
  131.                 except OSError:
  132.                     pass
  133.             raise
  134.         finally:
  135.             locks.pop(rel, None)
  136.  
  137.  
  138. app = web.Application()
  139. app.router.add_route("*", "/{tail:.*}", handler)
  140.  
  141.  
  142. async def _cleanup(_: web.Application) -> None:
  143.     if session:
  144.         await session.close()
  145.  
  146.  
  147. app.on_cleanup.append(_cleanup)
  148. web.run_app(app, host=HOST, port=PORT)
Advertisement