Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # session_factory.py
- from collections.abc import AsyncGenerator
- from sqlalchemy.ext.asyncio import (
- AsyncSession,
- async_sessionmaker,
- create_async_engine,
- AsyncEngine,
- )
- class SessionFactory:
- def __init__(self, postgres_url: str):
- self.engine: AsyncEngine = create_async_engine(postgres_url, echo=False)
- self.async_session_maker: async_sessionmaker[AsyncSession] = async_sessionmaker(
- self.engine,
- class_=AsyncSession,
- expire_on_commit=False,
- )
- async def dispose(self) -> None:
- await self.engine.dispose()
- async def __call__(self) -> AsyncGenerator[AsyncSession, None]:
- async with self.async_session_maker() as session:
- yield session
- # database.py
- from collections.abc import AsyncGenerator
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.orm import DeclarativeBase
- from sqlalchemy import MetaData
- import contextlib
- from src.session_factory import SessionFactory
- class Base(DeclarativeBase):
- metadata = MetaData()
- session_factory: SessionFactory | None = None
- async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
- if session_factory is None:
- raise RuntimeError("SessionFactory is not initialized")
- async for session in session_factory():
- yield session
- async def get_db_stub():
- raise NotImplementedError
- # main.py
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- db.session_factory = SessionFactory(settings.db.url_postgres)
- yield
- await db.session_factory.dispose()
Advertisement
Add Comment
Please, Sign In to add comment