import asyncio from enum import Enum from typing import Optional from fastapi import FastAPI from fastapi.exceptions import HTTPException from pydantic import BaseModel from pydantic.fields import Field from starlette.requests import Request app = FastAPI() async def task_handler(app): while not app.state.end_event.is_set(): task = await app.state.queue.get() try: priority, task_info = task task_info.state = TaskState.in_progress # Тут делай что твоя задача требует await asyncio.sleep(task_info.delay) task_info.state = TaskState.ended finally: app.state.queue.task_done() @app.on_event("startup") async def startup(): app.state.queue = asyncio.PriorityQueue() app.state.end_event = asyncio.Event() # На event можно навешать обработчиков gracefully shutdown app.state.tasks = {} asyncio.get_event_loop().create_task(task_handler(app)) @app.on_event("shutdown") async def shutdown(): app.state.end_event.set() class TaskState(str, Enum): created = "created" in_queue = "in_queue" in_progress = "in_progress" ended = "ended" class TaskInfoReq(BaseModel): task_priority: Optional[int] = Field(None, description="Приоритет задачи в очереди. Меньше - раньше") delay: int = Field(None, description="Всякая другая хрень для твоей задачи, просто добавляй поля") class TaskInfo(TaskInfoReq): state: TaskState @app.post("/task") async def create_task(request: Request, task_info: Optional[TaskInfoReq] = TaskInfoReq(delay=1)): task_info = TaskInfo(**task_info.dict(), state=TaskState.created) task_id = len(request.app.state.tasks) if task_info.task_priority is None: # Вообще у Field первый аргумент - значение по-умолчанию, # но дауны на фронтендерах могут положить туда null и все сломается, # поэтому лучше явно задать таким образом task_info.task_priority = task_id # Вот это - намеренная утечка памяти, задачи отсюда никуда не пропадут # Если тебе не интересен статус задачи после завершения, сделай отдельный счетчик для task_id # и сами задачи из словаря после завершения удаляй request.app.state.tasks[task_id] = task_info await request.app.state.queue.put((task_info.task_priority, task_info)) task_info.state = TaskState.in_queue return {"task_id": task_id} @app.get("/state/{task_id}") async def check_state(request: Request, task_id: int): task_info = request.app.state.tasks.get(task_id) if task_info is None: raise HTTPException(status_code=404, detail="Задача не найдена") return task_info