Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Non-working example of namespaced sequences using event listeners.
- Submitted as example code to the SQLAlchemy mailing list.
- """
- import sqlite3
- import sys
- from typing import Collection, List, Optional, Union
- from sqlalchemy import (
- create_engine,
- event,
- Column,
- Integer,
- ForeignKey,
- UniqueConstraint,
- )
- from sqlalchemy.engine import Engine
- import sqlalchemy.ext.declarative.api
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import relationship, sessionmaker, Session
- from sqlalchemy.orm.session import UOWTransaction
- from sqlalchemy.pool import Pool
- import sqlalchemy.pool.base
- Base: sqlalchemy.ext.declarative.api.DeclarativeMeta = declarative_base()
- class Parent(Base):
- __tablename__ = "parent"
- parent_id = Column(Integer, primary_key=True)
- class Child(Base):
- __tablename__ = "child"
- child_id = Column(Integer, primary_key=True)
- parent_id = Column(
- Integer,
- ForeignKey("parent.parent_id", ondelete="cascade"),
- nullable=False,
- )
- child_number = Column(Integer, nullable=False)
- parent = relationship("Parent")
- __table_args__ = (UniqueConstraint("parent_id", "child_number"),)
- class ChildNumberSequence(Base):
- """Table for keeping track of the next child number per parent."""
- __tablename__ = "child_number_sequence"
- parent_id = Column(
- Integer,
- ForeignKey("parent.parent_id", ondelete="cascade"),
- primary_key=True,
- )
- last_child_number = Column(Integer, nullable=False, server_default="0")
- parent = relationship("Parent")
- def insert_child_number_sequence(
- session: Session,
- _flush_context: UOWTransaction,
- _instances: Optional[Collection[Base]],
- ) -> None:
- """Add a row to the child_number_sequence table for each new row in the parent table."""
- for instance in session.new:
- if not isinstance(instance, Parent):
- continue
- child_number_sequence = ChildNumberSequence(parent=instance)
- session.add(child_number_sequence)
- def set_child_number(
- session: Session,
- _flush_context: UOWTransaction,
- _instances: Optional[Collection[Base]],
- ) -> None:
- """Set the child_number for each new child based on the last_child_number in the child_number_sequence table."""
- for instance in session.new:
- if not isinstance(instance, Child):
- continue
- child_number_sequence: ChildNumberSequence = session.query(
- ChildNumberSequence
- ).filter_by(parent=instance.parent).one()
- child_number_sequence.last_child_number = (
- ChildNumberSequence.last_child_number + 1
- )
- instance.child_number = child_number_sequence.last_child_number
- session.add(child_number_sequence)
- @event.listens_for(Engine, "connect")
- def set_sqlite_pragma(
- dbapi_connection,
- _connection_record: sqlalchemy.pool.base._ConnectionRecord,
- ) -> None:
- """Enable foreign_keys in SQLite."""
- if isinstance(dbapi_connection, sqlite3.Connection):
- cursor = dbapi_connection.cursor()
- cursor.execute("PRAGMA foreign_keys=ON")
- cursor.close()
- def setup_session_listeners(session: Session) -> None:
- """Set up event listeners for session. Call with your session/sessionmaker before using it."""
- event.listen(session, "before_flush", insert_child_number_sequence)
- event.listen(session, "before_flush", set_child_number)
- def main(argv: List[str]):
- if len(argv) >= 2:
- url = argv[1]
- else:
- url = "sqlite:///:memory:"
- engine = create_engine(url, echo=True)
- Base.metadata.create_all(engine)
- session = sessionmaker(bind=engine)
- setup_session_listeners(session)
- session = session()
- p1 = Parent()
- session.add(p1)
- session.commit()
- child_number_sequence_1 = session.query(ChildNumberSequence).get(
- p1.parent_id
- )
- assert child_number_sequence_1.parent_id == p1.parent_id
- assert child_number_sequence_1.last_child_number == 0
- c1_1 = Child(parent=p1)
- session.add(c1_1)
- session.commit()
- assert c1_1.child_number == 1
- assert child_number_sequence_1.last_child_number == 1
- c1_2 = Child(parent=p1)
- session.add(c1_2)
- session.commit()
- assert c1_2.child_number == 2
- assert child_number_sequence_1.last_child_number == 2
- p2 = Parent()
- session.add(p2)
- session.commit()
- child_number_sequence_2 = session.query(ChildNumberSequence).get(
- p2.parent_id
- )
- assert child_number_sequence_2.parent_id == p2.parent_id
- assert child_number_sequence_2.last_child_number == 0
- c2_1 = Child(parent=p2)
- session.add(c2_1)
- session.commit()
- assert c2_1.child_number == 1
- assert child_number_sequence_2.last_child_number == 1
- if __name__ == "__main__":
- sys.exit(main(sys.argv))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement