Advertisement
em1tao

Untitled

Jun 18th, 2024 (edited)
594
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.58 KB | None | 0 0
  1. #session_factory.py
  2. class SessionFactory:
  3.     def __init__(self, postgres_url: str):
  4.         self.engine: AsyncEngine = create_async_engine(postgres_url, echo=False)
  5.         self.async_session_maker: async_sessionmaker[AsyncSession] = async_sessionmaker(
  6.             self.engine,
  7.             class_=AsyncSession,
  8.             expire_on_commit=False,
  9.         )
  10.  
  11.     async def dispose(self) -> None:
  12.         await self.engine.dispose()
  13.  
  14.     async def __call__(self) -> AsyncGenerator[AsyncSession, None]:
  15.         async with self.async_session_maker() as session:
  16.             yield session
  17.  
  18.  
  19. #database.py
  20.  
  21. session_factory: SessionFactory | None = None
  22.  
  23. async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
  24.     if session_factory is None:
  25.         raise RuntimeError("SessionFactory is not initialized")
  26.     async for session in session_factory():
  27.         yield session
  28.  
  29. async def get_db_stub():
  30.     raise NotImplementedError
  31.  
  32. get_async_session_context = contextlib.asynccontextmanager(get_async_session)
  33.  
  34.  
  35. #routers.py
  36. @router.post(
  37.     "/", response_model=ReleaseCreateResponse, status_code=status.HTTP_201_CREATED
  38. )
  39. async def create_release(
  40.     user: Annotated[User, Depends(current_active_user)],
  41.     session: Annotated[AsyncSession, Depends(get_db_stub)],
  42. ):
  43.  ...
  44.  
  45. #main.py
  46. @asynccontextmanager
  47. async def lifespan(app: FastAPI):
  48.     db.session_factory = SessionFactory(settings.db.url_postgres)
  49.  
  50. app.dependency_overrides[AsyncSession] = db.get_async_session
  51. print(app.dependency_overrides[AsyncSession])
  52. >> function get_async_session at 0x7fd44df4e2a0
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement