Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # src/adapters/primary/api/routers/reports/report_views.py
- @router.put(path="...")
- async def report(
- start_dt: date = Query(default=date.today().strftime("%Y-%m-%d")),
- end_dt: date = Query(default=datetime.today().strftime("%Y-%m-%d")),
- use_case: ReportUseCase = Depends(get_report_use_case),
- background_tasks: BackgroundTasks = BackgroundTasks(),
- ):
- async def execute_with_cache_cleanup(
- start_dt: date,
- end_dt: date,
- payload: ReportCreatePayload,
- task_key: str,
- task_id: str,
- ):
- try:
- await use_case.execute(
- start_dt=start_dt,
- end_dt=end_dt,
- payload=payload,
- )
- finally:
- # Удаляем только если это та же задача (на случай перезаписи)
- if _task_cache.get(task_key, {}).get("task_id") == task_id:
- _task_cache.pop(task_key, None)
- # Создаем уникальный ключ для параметров
- task_key = f"system_report_{start_dt}_{end_dt}"
- task_id = gen_uuid()
- # Проверяем наличие задачи в кэше
- if task_key in _task_cache:
- return DuplicateTask(
- message="Задача уже выполняется",
- details="Попробуйте позже или измените параметры периода",
- existing_task_id=task_id,
- )
- # Регистрируем задачу в кэше
- _task_cache[task_key] = {"task_id": task_id, "created_at": time()}
- filename_to_save = "..."
- filename_tpl = "..."
- payload = ReportCreatePayload(tpl=filename_tpl, filename_to_save=filename_to_save)
- background_tasks.add_task(
- execute_with_cache_cleanup,
- start_dt=start_dt,
- end_dt=end_dt,
- payload=payload,
- task_key=task_key,
- task_id=task_id,
- )
- return ReportCreateResponse(filename=filename_to_save)
- # src/adapters/secondary/taskiq/broker.py
- __all__ = ("broker",)
- from taskiq_aio_pika import AioPikaBroker
- from taskiq_redis import RedisAsyncResultBackend
- from src.adapters.settings import app_config
- def get_broker():
- broker = AioPikaBroker(str(app_config.RABBITMQ_URL), persistent=True)
- result_backend = RedisAsyncResultBackend(
- str(app_config.REDIS_URL),
- result_ex_time=3600, # 1 час TTL
- )
- return broker.with_result_backend(result_backend)
- broker = get_broker()
- # src/adapters/primary/api/dependencies/taskiq.py
- # Тут надо реализовать проверку дублирующей задачи
- import hashlib
- from src.adapters.secondary.taskiq.broker import broker
- async def check_duplicate_task(task_name: str, params: dict) -> str | None:
- """Проверяет наличие дублирующей задачи"""
- # Генерируем уникальный ключ на основе параметров
- params_hash = hashlib.md5(f"{task_name}_{params}".encode()).hexdigest()
- task_key = f"task_lock:{task_name}:{params_hash}"
- # Проверяем в Redis (через result_backend)
- # Реализация зависит от конкретного бэкенда
- existing_task = broker.find_task(params_hash)
- if existing_task and existing_task.status not in ["success", "failure"]:
- return existing_task.task_id # Задача уже выполняется
- return None
Advertisement
Add Comment
Please, Sign In to add comment