Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from enum import Enum
- from typing import Optional
- from fastapi import FastAPI
- from fastapi.exceptions import HTTPException
- from pydantic import BaseModel
- from pydantic.fields import Field
- from starlette.requests import Request
- app = FastAPI()
- async def task_handler(app):
- while not app.state.end_event.is_set():
- task = await app.state.queue.get()
- try:
- priority, task_info = task
- task_info.state = TaskState.in_progress
- # Тут делай что твоя задача требует
- await asyncio.sleep(task_info.delay)
- task_info.state = TaskState.ended
- finally:
- app.state.queue.task_done()
- @app.on_event("startup")
- async def startup():
- app.state.queue = asyncio.PriorityQueue()
- app.state.end_event = asyncio.Event() # На event можно навешать обработчиков gracefully shutdown
- app.state.tasks = {}
- asyncio.get_event_loop().create_task(task_handler(app))
- @app.on_event("shutdown")
- async def shutdown():
- app.state.end_event.set()
- class TaskState(str, Enum):
- created = "created"
- in_queue = "in_queue"
- in_progress = "in_progress"
- ended = "ended"
- class TaskInfoReq(BaseModel):
- task_priority: Optional[int] = Field(None, description="Приоритет задачи в очереди. Меньше - раньше")
- delay: int = Field(None, description="Всякая другая хрень для твоей задачи, просто добавляй поля")
- class TaskInfo(TaskInfoReq):
- state: TaskState
- @app.post("/task")
- async def create_task(request: Request, task_info: Optional[TaskInfoReq] = TaskInfoReq(delay=1)):
- task_info = TaskInfo(**task_info.dict(), state=TaskState.created)
- task_id = len(request.app.state.tasks)
- if task_info.task_priority is None:
- # Вообще у Field первый аргумент - значение по-умолчанию,
- # но дауны на фронтендерах могут положить туда null и все сломается,
- # поэтому лучше явно задать таким образом
- task_info.task_priority = task_id
- # Вот это - намеренная утечка памяти, задачи отсюда никуда не пропадут
- # Если тебе не интересен статус задачи после завершения, сделай отдельный счетчик для task_id
- # и сами задачи из словаря после завершения удаляй
- request.app.state.tasks[task_id] = task_info
- await request.app.state.queue.put((task_info.task_priority, task_info))
- task_info.state = TaskState.in_queue
- return {"task_id": task_id}
- @app.get("/state/{task_id}")
- async def check_state(request: Request, task_id: int):
- task_info = request.app.state.tasks.get(task_id)
- if task_info is None:
- raise HTTPException(status_code=404, detail="Задача не найдена")
- return task_info
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement