Guest User

Untitled

a guest
Jan 4th, 2026
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.89 KB | None | 0 0
  1. import asyncio
  2. import dataclasses
  3. import typing
  4.  
  5. import httpx
  6.  
  7.  
  8. @dataclasses.dataclass
  9. class HttpRequest:
  10.     url: str
  11.  
  12.  
  13. @dataclasses.dataclass
  14. class HttpResponse:
  15.     code: int
  16.     body: bytes
  17.  
  18.  
  19. type HttpGenerator[R] = typing.Generator[HttpRequest, HttpResponse, R]
  20.  
  21.  
  22. def http_once(
  23.     url: str,
  24. ) -> HttpGenerator[HttpResponse]:
  25.     print("requesting", url)
  26.     result = yield HttpRequest(url=url)
  27.     print("response was", result)
  28.     return result
  29.  
  30.  
  31. def do_multiple_requests() -> HttpGenerator[bool]:
  32.     # The idea is to allow sending multiple requests, for example during auth sequence.
  33.     # My original task was something like grabbing the XSRF token from the first response and using it during the second.
  34.     first = yield from http_once("https://google.com")
  35.     second = yield from http_once("https://baidu.com")
  36.     return len(first.body) > len(second.body)
  37.  
  38.  
  39. def execute_sync[R](task: HttpGenerator[R]) -> R:
  40.     with httpx.Client() as client:
  41.         current_request = next(task)
  42.         while True:
  43.             resp = client.get(url=current_request.url)
  44.             prepared = HttpResponse(code=resp.status_code, body=resp.content)
  45.             try:
  46.                 current_request = task.send(prepared)
  47.             except StopIteration as e:
  48.                 return e.value
  49.  
  50.  
  51. async def execute_async[R](task: HttpGenerator[R]) -> R:
  52.     async with httpx.AsyncClient() as client:
  53.         current_request = next(task)
  54.         while True:
  55.             resp = await client.get(url=current_request.url)
  56.             prepared = HttpResponse(code=resp.status_code, body=resp.content)
  57.             try:
  58.                 current_request = task.send(prepared)
  59.             except StopIteration as e:
  60.                 return e.value
  61.  
  62.  
  63. print("sync version")
  64. execute_sync(do_multiple_requests())
  65.  
  66. print("async version")
  67. asyncio.run(execute_async(do_multiple_requests()))
  68.  
Advertisement
Add Comment
Please, Sign In to add comment