ruchej

fastapi-taskiq

Sep 24th, 2025
51
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.51 KB | Source Code | 0 0
  1. # src/adapters/primary/api/routers/reports/report_views.py
  2.  
  3. @router.put(path="...")
  4. async def report(
  5.     start_dt: date = Query(default=date.today().strftime("%Y-%m-%d")),
  6.     end_dt: date = Query(default=datetime.today().strftime("%Y-%m-%d")),
  7.     use_case: ReportUseCase = Depends(get_report_use_case),
  8.     background_tasks: BackgroundTasks = BackgroundTasks(),
  9. ):
  10.     async def execute_with_cache_cleanup(
  11.         start_dt: date,
  12.         end_dt: date,
  13.         payload: ReportCreatePayload,
  14.         task_key: str,
  15.         task_id: str,
  16.     ):
  17.         try:
  18.             await use_case.execute(
  19.                 start_dt=start_dt,
  20.                 end_dt=end_dt,
  21.                 payload=payload,
  22.             )
  23.         finally:
  24.             # Удаляем только если это та же задача (на случай перезаписи)
  25.             if _task_cache.get(task_key, {}).get("task_id") == task_id:
  26.                 _task_cache.pop(task_key, None)
  27.  
  28.     # Создаем уникальный ключ для параметров
  29.     task_key = f"system_report_{start_dt}_{end_dt}"
  30.     task_id = gen_uuid()
  31.  
  32.     # Проверяем наличие задачи в кэше
  33.     if task_key in _task_cache:
  34.         return DuplicateTask(
  35.             message="Задача уже выполняется",
  36.             details="Попробуйте позже или измените параметры периода",
  37.             existing_task_id=task_id,
  38.         )
  39.  
  40.     # Регистрируем задачу в кэше
  41.     _task_cache[task_key] = {"task_id": task_id, "created_at": time()}
  42.  
  43.     filename_to_save = "..."
  44.     filename_tpl = "..."
  45.  
  46.     payload = ReportCreatePayload(tpl=filename_tpl, filename_to_save=filename_to_save)
  47.  
  48.     background_tasks.add_task(
  49.         execute_with_cache_cleanup,
  50.         start_dt=start_dt,
  51.         end_dt=end_dt,
  52.         payload=payload,
  53.         task_key=task_key,
  54.         task_id=task_id,
  55.     )
  56.  
  57.     return ReportCreateResponse(filename=filename_to_save)
  58.  
  59.  
  60. # src/adapters/secondary/taskiq/broker.py
  61.  
  62. __all__ = ("broker",)
  63. from taskiq_aio_pika import AioPikaBroker
  64. from taskiq_redis import RedisAsyncResultBackend
  65.  
  66. from src.adapters.settings import app_config
  67.  
  68.  
  69. def get_broker():
  70.     broker = AioPikaBroker(str(app_config.RABBITMQ_URL), persistent=True)
  71.     result_backend = RedisAsyncResultBackend(
  72.         str(app_config.REDIS_URL),
  73.         result_ex_time=3600,  # 1 час TTL
  74.     )
  75.     return broker.with_result_backend(result_backend)
  76.  
  77. broker = get_broker()
  78.  
  79.  
  80. # src/adapters/primary/api/dependencies/taskiq.py
  81. # Тут надо реализовать проверку дублирующей задачи
  82.  
  83. import hashlib
  84.  
  85. from src.adapters.secondary.taskiq.broker import broker
  86.  
  87.  
  88. async def check_duplicate_task(task_name: str, params: dict) -> str | None:
  89.     """Проверяет наличие дублирующей задачи"""
  90.     # Генерируем уникальный ключ на основе параметров
  91.  
  92.     params_hash = hashlib.md5(f"{task_name}_{params}".encode()).hexdigest()
  93.     task_key = f"task_lock:{task_name}:{params_hash}"
  94.  
  95.     # Проверяем в Redis (через result_backend)
  96.     # Реализация зависит от конкретного бэкенда
  97.     existing_task = broker.find_task(params_hash)
  98.     if existing_task and existing_task.status not in ["success", "failure"]:
  99.         return existing_task.task_id  # Задача уже выполняется
  100.  
  101.     return None
  102.  
Advertisement
Add Comment
Please, Sign In to add comment