Advertisement
simlmx

stackoverflow question

Mar 12th, 2016
167
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.65 KB | None | 0 0
  1. import argparse, sys
  2. from pprint import pprint
  3. try:
  4.     import sqlalchemy
  5. except ImportError:
  6.     print('you need to install sqlalchemy: run `pip install sqlalchemy`')
  7.     sys.exit()
  8. try:
  9.     import psycopg2
  10. except ImportError:
  11.     print('you need to install psycopg2: run `pip install psycopg2`')
  12.     sys.exit()
  13.  
  14. from sqlalchemy import Column, String, Integer, ForeignKey, create_engine, desc
  15. from sqlalchemy.orm import sessionmaker, aliased
  16. from sqlalchemy.ext.declarative import declarative_base
  17. from sqlalchemy.exc import ProgrammingError
  18.  
  19. # Table definitions
  20. Base = declarative_base()
  21.  
  22. class Item(Base):
  23.     id = Column(Integer, primary_key=True)
  24.     __tablename__ = 'item'
  25.  
  26.     def __repr__(self):
  27.         return 'Item(id={})'.format(self.id)
  28.  
  29.  
  30. class ItemVersion(Base):
  31.     id = Column(Integer, primary_key=True)
  32.     item_id = Column(Integer, ForeignKey('item.id'))
  33.     version = Column(Integer)
  34.     text = Column(String)
  35.     __tablename__ = 'item_version'
  36.  
  37.     def __repr__(self):
  38.         return 'ItemVersion(id={}, text={})'.format(self.id, self.text)
  39.  
  40.  
  41. def create_database(postgres_db):
  42.     # Create the database
  43.     engine_postgres = create_engine(postgres_db)
  44.     SessionPostgres = sessionmaker(bind=engine_postgres)
  45.  
  46.     session = SessionPostgres()
  47.  
  48.     session.execute('commit')
  49.     try:
  50.         session.execute('create database ' + db_name)
  51.     except ProgrammingError:
  52.         # the database already exists
  53.         session.execute('drop database ' + db_name)
  54.         session.execute('create database ' + db_name)
  55.     session.close()
  56.     engine_postgres.dispose()
  57.  
  58.  
  59. def fill_database(session):
  60.     for i in range(3):
  61.         session.add(Item(id=i + 1))
  62.     session.commit()
  63.     for i, (item_id, version) in enumerate([
  64.             (1, 0), (1, 1), (2, 0), (3, 0)]):
  65.         session.add(ItemVersion(
  66.             id=i + 1,
  67.             item_id=item_id,
  68.             version=version,
  69.             text = 'item_{}_v{}'.format(item_id, version)
  70.         ))
  71.     session.commit()
  72.  
  73.  
  74. # This is the missing function!
  75. # ----------------- TODO -------------- #
  76.  
  77. def join_version(session, query):
  78.     sq = aliased(Item, query.subquery('sq'))
  79.  
  80.     sq2 = session.query(sq, ItemVersion) \
  81.         .distinct(sq.id) \
  82.         .join(ItemVersion) \
  83.         .order_by(sq.id, desc(ItemVersion.version))
  84.     # return sq2
  85.     # TODO this doesn't work!
  86.     # q = session.query(Item, ItemVersion) \
  87.     #     .select_from(sq2) \
  88.     #     .order_by(*query._order_by)
  89.     return sq2
  90.  
  91. # --------------- END TODO ------------ #
  92.  
  93.  
  94. def test_join_version(session):
  95.     # This is the first query in Items
  96.     item_query = session.query(Item) \
  97.         .filter(Item.id != 2) \
  98.         .order_by(desc(Item.id))
  99.  
  100.     # And this should join the ItemVersions to it
  101.     result = join_version(session, item_query).all()
  102.  
  103.     # Here is what the results should be like
  104.     # result = [(session.query(Item).filter(Item.id == 3).one(),
  105.     #            session.query(ItemVersion).filter(ItemVersion.id == 4).one()),
  106.     #           (session.query(Item).filter(Item.id == 1).one(),
  107.     #            session.query(ItemVersion).filter(ItemVersion.id == 2).one())]
  108.  
  109.     # Valid that it works
  110.     try:
  111.         # right number of elements
  112.         assert len(result) == 2, 'wrong nb of items'
  113.         # right type
  114.         assert isinstance(result[0][0], Item), 'not instance of Item'
  115.         assert isinstance(result[0][1], ItemVersion), 'not instance of ItemVersion'
  116.         # right Items
  117.         assert result[0][0].id == 3, 'wrong first Item'
  118.         assert result[1][0].id == 1, 'wrong second Item'
  119.         # right ItemVersions
  120.         assert result[0][1].text == 'item_3_v0', 'wrong first ItemVersion'
  121.         assert result[1][1].text == 'item_1_v1', 'wrong second ItemVersion'
  122.     except AssertionError as e:
  123.         print('Still does not work :(')
  124.         print(e)
  125.         print('results were:')
  126.         pprint(result)
  127.     else:
  128.         print('You did it! You are my hero!')
  129.  
  130.  
  131. def drop_database():
  132.     engine_postgres = create_engine(postgres_db)
  133.     SessionPostgres = sessionmaker(bind=engine_postgres)
  134.     session = SessionPostgres()
  135.     session.execute('commit')
  136.     session.execute('drop database ' + db_name)
  137.     session.close()
  138.     engine_postgres.dispose()
  139.  
  140.  
  141. if __name__ == '__main__':
  142.     parser = argparse.ArgumentParser(
  143.         description='code for my stackoverflow question')
  144.     parser.add_argument('-H', '--host', default='localhost',
  145.                         help='postgresql host')
  146.     parser.add_argument('-P', '--port', default='5432', help='postgresql port')
  147.     parser.add_argument('-u', '--username', default='postgres',
  148.                         help='postgresql username')
  149.     parser.add_argument('-p','--password', default='postgres',
  150.                         help='postgresql username', required=True)
  151.     args = parser.parse_args()
  152.  
  153.     # Some constants for the database
  154.     postgres_db = 'postgresql+psycopg2://{user}:{password}@{host}:{port}'.format(
  155.         user=args.username,
  156.         password=args.password,
  157.         host=args.host,
  158.         port=args.port
  159.     )
  160.     db_name = 'super_so_qtion'
  161.     super_so_qtion_db = postgres_db + '/' + db_name
  162.  
  163.     # Creating the database
  164.     create_database(postgres_db)
  165.  
  166.     engine = create_engine(super_so_qtion_db)
  167.     Session = sessionmaker(bind=engine)
  168.     session = Session()
  169.  
  170.     # Creating the tables
  171.     Base.metadata.create_all(engine)
  172.  
  173.     # Fill the database with the example's data
  174.     fill_database(session)
  175.     # input()
  176.     # Test that our function works
  177.     test_join_version(session)
  178.  
  179.     # Clean
  180.     session.close()
  181.     engine.dispose()
  182.     drop_database()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement