Advertisement
Guest User

2ch

a guest
Jul 7th, 2021
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.10 KB | None | 0 0
  1. import asyncio
  2. from enum import Enum
  3. from typing import Optional
  4.  
  5. from fastapi import FastAPI
  6. from fastapi.exceptions import HTTPException
  7. from pydantic import BaseModel
  8. from pydantic.fields import Field
  9. from starlette.requests import Request
  10.  
  11. app = FastAPI()
  12.  
  13. async def task_handler(app):
  14.     while not app.state.end_event.is_set():
  15.         task = await app.state.queue.get()
  16.         try:
  17.             priority, task_info = task
  18.             task_info.state = TaskState.in_progress
  19.             # Тут делай что твоя задача требует
  20.             await asyncio.sleep(task_info.delay)
  21.             task_info.state = TaskState.ended
  22.         finally:
  23.             app.state.queue.task_done()
  24.  
  25. @app.on_event("startup")
  26. async def startup():
  27.     app.state.queue = asyncio.PriorityQueue()
  28.     app.state.end_event = asyncio.Event() # На event можно навешать обработчиков gracefully shutdown
  29.     app.state.tasks = {}
  30.     asyncio.get_event_loop().create_task(task_handler(app))
  31.  
  32. @app.on_event("shutdown")
  33. async def shutdown():
  34.     app.state.end_event.set()
  35.  
  36. class TaskState(str, Enum):
  37.     created = "created"
  38.     in_queue = "in_queue"
  39.     in_progress = "in_progress"
  40.     ended = "ended"
  41.  
  42. class TaskInfoReq(BaseModel):
  43.     task_priority: Optional[int] = Field(None, description="Приоритет задачи в очереди. Меньше - раньше")
  44.     delay: int = Field(None, description="Всякая другая хрень для твоей задачи, просто добавляй поля")
  45.  
  46. class TaskInfo(TaskInfoReq):
  47.     state: TaskState
  48.  
  49. @app.post("/task")
  50. async def create_task(request: Request, task_info: Optional[TaskInfoReq] = TaskInfoReq(delay=1)):
  51.     task_info = TaskInfo(**task_info.dict(), state=TaskState.created)
  52.     task_id = len(request.app.state.tasks)
  53.     if task_info.task_priority is None:
  54.         # Вообще у Field первый аргумент - значение по-умолчанию,
  55.         # но дауны на фронтендерах могут положить туда null и все сломается,
  56.         # поэтому лучше явно задать таким образом
  57.         task_info.task_priority = task_id
  58.  
  59.     # Вот это - намеренная утечка памяти, задачи отсюда никуда не пропадут
  60.     # Если тебе не интересен статус задачи после завершения, сделай отдельный счетчик для task_id
  61.     # и сами задачи из словаря после завершения удаляй
  62.     request.app.state.tasks[task_id] = task_info
  63.  
  64.     await request.app.state.queue.put((task_info.task_priority, task_info))
  65.     task_info.state = TaskState.in_queue
  66.     return {"task_id": task_id}
  67.  
  68.  
  69. @app.get("/state/{task_id}")
  70. async def check_state(request: Request, task_id: int):
  71.     task_info = request.app.state.tasks.get(task_id)
  72.     if task_info is None:
  73.         raise HTTPException(status_code=404, detail="Задача не найдена")
  74.     return task_info
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement