Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse, sys
- from pprint import pprint
- try:
- import sqlalchemy
- except ImportError:
- print('you need to install sqlalchemy: run `pip install sqlalchemy`')
- sys.exit()
- try:
- import psycopg2
- except ImportError:
- print('you need to install psycopg2: run `pip install psycopg2`')
- sys.exit()
- from sqlalchemy import Column, String, Integer, ForeignKey, create_engine, desc
- from sqlalchemy.orm import sessionmaker, aliased
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.exc import ProgrammingError
- # Table definitions
- Base = declarative_base()
- class Item(Base):
- id = Column(Integer, primary_key=True)
- __tablename__ = 'item'
- def __repr__(self):
- return 'Item(id={})'.format(self.id)
- class ItemVersion(Base):
- id = Column(Integer, primary_key=True)
- item_id = Column(Integer, ForeignKey('item.id'))
- version = Column(Integer)
- text = Column(String)
- __tablename__ = 'item_version'
- def __repr__(self):
- return 'ItemVersion(id={}, text={})'.format(self.id, self.text)
- def create_database(postgres_db):
- # Create the database
- engine_postgres = create_engine(postgres_db)
- SessionPostgres = sessionmaker(bind=engine_postgres)
- session = SessionPostgres()
- session.execute('commit')
- try:
- session.execute('create database ' + db_name)
- except ProgrammingError:
- # the database already exists
- session.execute('drop database ' + db_name)
- session.execute('create database ' + db_name)
- session.close()
- engine_postgres.dispose()
- def fill_database(session):
- for i in range(3):
- session.add(Item(id=i + 1))
- session.commit()
- for i, (item_id, version) in enumerate([
- (1, 0), (1, 1), (2, 0), (3, 0)]):
- session.add(ItemVersion(
- id=i + 1,
- item_id=item_id,
- version=version,
- text = 'item_{}_v{}'.format(item_id, version)
- ))
- session.commit()
- # This is the missing function!
- # ----------------- TODO -------------- #
- def join_version(session, query):
- sq = aliased(Item, query.subquery('sq'))
- sq2 = session.query(sq, ItemVersion) \
- .distinct(sq.id) \
- .join(ItemVersion) \
- .order_by(sq.id, desc(ItemVersion.version))
- # return sq2
- # TODO this doesn't work!
- # q = session.query(Item, ItemVersion) \
- # .select_from(sq2) \
- # .order_by(*query._order_by)
- return sq2
- # --------------- END TODO ------------ #
- def test_join_version(session):
- # This is the first query in Items
- item_query = session.query(Item) \
- .filter(Item.id != 2) \
- .order_by(desc(Item.id))
- # And this should join the ItemVersions to it
- result = join_version(session, item_query).all()
- # Here is what the results should be like
- # result = [(session.query(Item).filter(Item.id == 3).one(),
- # session.query(ItemVersion).filter(ItemVersion.id == 4).one()),
- # (session.query(Item).filter(Item.id == 1).one(),
- # session.query(ItemVersion).filter(ItemVersion.id == 2).one())]
- # Valid that it works
- try:
- # right number of elements
- assert len(result) == 2, 'wrong nb of items'
- # right type
- assert isinstance(result[0][0], Item), 'not instance of Item'
- assert isinstance(result[0][1], ItemVersion), 'not instance of ItemVersion'
- # right Items
- assert result[0][0].id == 3, 'wrong first Item'
- assert result[1][0].id == 1, 'wrong second Item'
- # right ItemVersions
- assert result[0][1].text == 'item_3_v0', 'wrong first ItemVersion'
- assert result[1][1].text == 'item_1_v1', 'wrong second ItemVersion'
- except AssertionError as e:
- print('Still does not work :(')
- print(e)
- print('results were:')
- pprint(result)
- else:
- print('You did it! You are my hero!')
- def drop_database():
- engine_postgres = create_engine(postgres_db)
- SessionPostgres = sessionmaker(bind=engine_postgres)
- session = SessionPostgres()
- session.execute('commit')
- session.execute('drop database ' + db_name)
- session.close()
- engine_postgres.dispose()
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description='code for my stackoverflow question')
- parser.add_argument('-H', '--host', default='localhost',
- help='postgresql host')
- parser.add_argument('-P', '--port', default='5432', help='postgresql port')
- parser.add_argument('-u', '--username', default='postgres',
- help='postgresql username')
- parser.add_argument('-p','--password', default='postgres',
- help='postgresql username', required=True)
- args = parser.parse_args()
- # Some constants for the database
- postgres_db = 'postgresql+psycopg2://{user}:{password}@{host}:{port}'.format(
- user=args.username,
- password=args.password,
- host=args.host,
- port=args.port
- )
- db_name = 'super_so_qtion'
- super_so_qtion_db = postgres_db + '/' + db_name
- # Creating the database
- create_database(postgres_db)
- engine = create_engine(super_so_qtion_db)
- Session = sessionmaker(bind=engine)
- session = Session()
- # Creating the tables
- Base.metadata.create_all(engine)
- # Fill the database with the example's data
- fill_database(session)
- # input()
- # Test that our function works
- test_join_version(session)
- # Clean
- session.close()
- engine.dispose()
- drop_database()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement