Advertisement
Guest User

Untitled

a guest
Oct 23rd, 2019
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.74 KB | None | 0 0
  1. """
  2. Non-working example of namespaced sequences using event listeners.
  3. Submitted as example code to the SQLAlchemy mailing list.
  4. """
  5.  
  6. import sqlite3
  7. import sys
  8. from typing import Collection, List, Optional, Union
  9.  
  10. from sqlalchemy import (
  11. create_engine,
  12. event,
  13. Column,
  14. Integer,
  15. ForeignKey,
  16. UniqueConstraint,
  17. )
  18. from sqlalchemy.engine import Engine
  19. import sqlalchemy.ext.declarative.api
  20. from sqlalchemy.ext.declarative import declarative_base
  21. from sqlalchemy.orm import relationship, sessionmaker, Session
  22. from sqlalchemy.orm.session import UOWTransaction
  23. from sqlalchemy.pool import Pool
  24. import sqlalchemy.pool.base
  25.  
  26. Base: sqlalchemy.ext.declarative.api.DeclarativeMeta = declarative_base()
  27.  
  28.  
  29. class Parent(Base):
  30. __tablename__ = "parent"
  31.  
  32. parent_id = Column(Integer, primary_key=True)
  33.  
  34.  
  35. class Child(Base):
  36. __tablename__ = "child"
  37.  
  38. child_id = Column(Integer, primary_key=True)
  39. parent_id = Column(
  40. Integer,
  41. ForeignKey("parent.parent_id", ondelete="cascade"),
  42. nullable=False,
  43. )
  44. child_number = Column(Integer, nullable=False)
  45.  
  46. parent = relationship("Parent")
  47.  
  48. __table_args__ = (UniqueConstraint("parent_id", "child_number"),)
  49.  
  50.  
  51. class ChildNumberSequence(Base):
  52. """Table for keeping track of the next child number per parent."""
  53.  
  54. __tablename__ = "child_number_sequence"
  55.  
  56. parent_id = Column(
  57. Integer,
  58. ForeignKey("parent.parent_id", ondelete="cascade"),
  59. primary_key=True,
  60. )
  61. last_child_number = Column(Integer, nullable=False, server_default="0")
  62.  
  63. parent = relationship("Parent")
  64.  
  65.  
  66. def insert_child_number_sequence(
  67. session: Session,
  68. _flush_context: UOWTransaction,
  69. _instances: Optional[Collection[Base]],
  70. ) -> None:
  71. """Add a row to the child_number_sequence table for each new row in the parent table."""
  72. for instance in session.new:
  73. if not isinstance(instance, Parent):
  74. continue
  75. child_number_sequence = ChildNumberSequence(parent=instance)
  76. session.add(child_number_sequence)
  77.  
  78.  
  79. def set_child_number(
  80. session: Session,
  81. _flush_context: UOWTransaction,
  82. _instances: Optional[Collection[Base]],
  83. ) -> None:
  84. """Set the child_number for each new child based on the last_child_number in the child_number_sequence table."""
  85. for instance in session.new:
  86. if not isinstance(instance, Child):
  87. continue
  88. child_number_sequence: ChildNumberSequence = session.query(
  89. ChildNumberSequence
  90. ).filter_by(parent=instance.parent).one()
  91. child_number_sequence.last_child_number = (
  92. ChildNumberSequence.last_child_number + 1
  93. )
  94. instance.child_number = child_number_sequence.last_child_number
  95. session.add(child_number_sequence)
  96.  
  97.  
  98. @event.listens_for(Engine, "connect")
  99. def set_sqlite_pragma(
  100. dbapi_connection,
  101. _connection_record: sqlalchemy.pool.base._ConnectionRecord,
  102. ) -> None:
  103. """Enable foreign_keys in SQLite."""
  104. if isinstance(dbapi_connection, sqlite3.Connection):
  105. cursor = dbapi_connection.cursor()
  106. cursor.execute("PRAGMA foreign_keys=ON")
  107. cursor.close()
  108.  
  109.  
  110. def setup_session_listeners(session: Session) -> None:
  111. """Set up event listeners for session. Call with your session/sessionmaker before using it."""
  112. event.listen(session, "before_flush", insert_child_number_sequence)
  113. event.listen(session, "before_flush", set_child_number)
  114.  
  115.  
  116. def main(argv: List[str]):
  117. if len(argv) >= 2:
  118. url = argv[1]
  119. else:
  120. url = "sqlite:///:memory:"
  121.  
  122. engine = create_engine(url, echo=True)
  123. Base.metadata.create_all(engine)
  124. session = sessionmaker(bind=engine)
  125. setup_session_listeners(session)
  126. session = session()
  127.  
  128. p1 = Parent()
  129. session.add(p1)
  130. session.commit()
  131.  
  132. child_number_sequence_1 = session.query(ChildNumberSequence).get(
  133. p1.parent_id
  134. )
  135. assert child_number_sequence_1.parent_id == p1.parent_id
  136. assert child_number_sequence_1.last_child_number == 0
  137.  
  138. c1_1 = Child(parent=p1)
  139. session.add(c1_1)
  140. session.commit()
  141.  
  142. assert c1_1.child_number == 1
  143. assert child_number_sequence_1.last_child_number == 1
  144.  
  145. c1_2 = Child(parent=p1)
  146. session.add(c1_2)
  147. session.commit()
  148.  
  149. assert c1_2.child_number == 2
  150. assert child_number_sequence_1.last_child_number == 2
  151.  
  152. p2 = Parent()
  153. session.add(p2)
  154. session.commit()
  155.  
  156. child_number_sequence_2 = session.query(ChildNumberSequence).get(
  157. p2.parent_id
  158. )
  159. assert child_number_sequence_2.parent_id == p2.parent_id
  160. assert child_number_sequence_2.last_child_number == 0
  161.  
  162. c2_1 = Child(parent=p2)
  163. session.add(c2_1)
  164. session.commit()
  165.  
  166. assert c2_1.child_number == 1
  167. assert child_number_sequence_2.last_child_number == 1
  168.  
  169.  
  170. if __name__ == "__main__":
  171. sys.exit(main(sys.argv))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement