Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from functools import wraps, partial
- import sqlalchemy
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy.ext.declarative import declarative_base
- loop = asyncio.get_event_loop()
- engine = sqlalchemy.create_engine(
- "sqlite:///data.db"
- )
- Base = declarative_base()
- Session = sessionmaker(engine)
- class SessionPool:
- def __init__(self, size=10):
- self.sessions = asyncio.queues.Queue(size)
- for _ in range(size):
- self.sessions.put_nowait(Session())
- def async_with_alchemy(self, f):
- @wraps(f)
- async def ghosting_session(*args, **kwargs):
- loop = asyncio.get_running_loop()
- partial_func = partial(f, *args, **kwargs)
- async def execute_function():
- session = await self.sessions.get()
- result = await loop.run_in_executor(None, partial_func, session)
- await self.sessions.put(session)
- return result
- return await execute_function()
- return ghosting_session
- session_pool = SessionPool()
- class DemoUser(Base):
- id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
- name = sqlalchemy.Column(sqlalchemy.String)
- __tablename__ = "users"
- @classmethod
- @session_pool.async_with_alchemy
- def init_data(cls, session):
- users = [
- DemoUser(name="Rud"), DemoUser(name="Tester"), DemoUser(name="Someone else")
- ]
- session.commit()
- return users
- async def example():
- users = DemoUser.init_data()
- users2 = DemoUser.init_data()
- await users
- await users
- print(users)
- loop.run_until_complete(example())
Advertisement
Add Comment
Please, Sign In to add comment