Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # 1. Execute an INSERT statement that will insert the row with emp_name='dilbert'. The primary key column can be omitted so that it is generated automatically.
- ```
- import os
- from sqlalchemy import create_engine
- if os.path.exists("some.db"):
- os.remove("some.db")
- e = create_engine("sqlite:///some.db")
- e.execute("""
- CREATE TABLE employee (
- emp_id INTEGER PRIMARY KEY,
- emp_name VARCHAR
- )
- """)
- e.execute("""INSERT INTO employee(emp_name) VALUES ('dilbert')""")
- ```
- # 2. SELECT all rows from the employee table.
- from sqlalchemy import create_engine
- e = create_engine("sqlite:///some.db")
- result = e.execute("select * from employee")
- for row in result:
- print(row)
- ```
- (1, 'dilbert')
- ```
- # 1. Write a Table construct corresponding to this CREATE TABLE statement.
- ```
- from sqlalchemy import MetaData
- from sqlalchemy import Table, Column, ForeignKey
- from sqlalchemy import Integer, DateTime, Unicode
- metadata = MetaData()
- user_table = Table('network', metadata,
- Column('network_id', Integer, primary_key=True),
- Column('created_at', DateTime, nullable=False),
- Column('owner_id', Integer, ForeignKey('user.id')),
- Column('name', Unicode(100), nullable=False)
- )
- ```
- # 2. Then emit metadata.create_all(), which will emit CREATE TABLE for this table (it will skip those that already exist).
- ```
- from sqlalchemy import MetaData, create_engine
- from sqlalchemy import Table, Column, ForeignKey
- from sqlalchemy import Integer, DateTime, Unicode, String
- e = create_engine("sqlite:///some.db")
- metadata = MetaData()
- user_table = Table('user', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String),
- Column('fullname', String)
- )
- network_table = Table('network', metadata,
- Column('network_id', Integer, primary_key=True),
- Column('created_at', DateTime, nullable=False),
- Column('owner_id', Integer, ForeignKey('user.id')),
- Column('name', Unicode(100), nullable=False)
- )
- metadata.create_all(e)
- ```
- # 1. Using 'metadata2', reflect the "network" table in the same way we just did 'user', then display the columns (or bonus, display just the column names)
- from sqlalchemy import MetaData, create_engine
- from sqlalchemy import Table
- e = create_engine("sqlite:///some.db")
- metadata = MetaData()
- user_reflected = Table('network', metadata, autoload=True, autoload_with=e)
- print(user_reflected.c)
- ```
- ['network.network_id', 'network.created_at', 'network.owner_id', 'network.name']
- ```
- from sqlalchemy import MetaData, create_engine
- from sqlalchemy import Table
- e = create_engine("sqlite:///some.db")
- metadata = MetaData()
- user_reflected = Table('network', metadata, autoload=True, autoload_with=e)
- for col in user_reflected.columns:
- print(col.name)
- ```
- network_id
- created_at
- owner_id
- name
- ```
- # 2. Using "inspector", print a list of all table names that include a column called "story_id"
- from sqlalchemy import inspect, create_engine
- e = create_engine("sqlite:///some.db")
- res = []
- for table in inspect(e).get_table_names():
- for col in inspect(e).get_columns(table):
- if col['name'] == 'story_id':
- res.append(table)
- print(res)
- ```
- ['published', 'story']
- ```
- # Produce these expressions using "user_table.c.fullname", "user_table.c.id", and "user_table.c.username": 1. user.fullname = 'ed' 2. user.fullname = 'ed' AND user.id > 5 3. user.username = 'edward' OR (user.fullname = 'ed' AND user.id > 5)
- ```
- from sqlalchemy import MetaData, Table, Column, String, Integer, and_, or_
- from sqlalchemy import create_engine
- metadata = MetaData()
- user_table = Table('user', metadata,
- Column('id', Integer, primary_key=True),
- Column('username', String(50)),
- Column('fullname', String(50))
- )
- engine = create_engine("sqlite://")
- metadata.create_all(engine)
- print(user_table.c.fullname == 'ed')
- print(and_(user_table.c.fullname == 'ed', user_table.c.id > 5))
- print(or_(user_table.c.fullname == 'edward', and_(user_table.c.fullname == 'ed', user_table.c.id > 5)))
- ```
- # 1. use user_table.insert() and "r = conn.execute()" to emit this statement: INSERT INTO user (username, fullname) VALUES ('dilbert', 'Dilbert Jones') 2. What is the value of 'user.id' for the above INSERT statement? 3. Using "select([user_table])", execute this SELECT: SELECT id, username, fullname FROM user WHERE username = 'wendy' OR username = 'dilbert' ORDER BY fullname
- from sqlalchemy import MetaData, Table, Column, String, Integer, or_, select
- from sqlalchemy import create_engine
- metadata = MetaData()
- user_table = Table('user', metadata,
- Column('id', Integer, primary_key=True),
- Column('username', String(50)),
- Column('fullname', String(50))
- )
- engine = create_engine("sqlite://")
- metadata.create_all(engine)
- conn = engine.connect()
- conn.execute(user_table.insert(), [{'username': 'dilbert', 'fullname': 'Dilbert Jones'}])
- print(conn.execute(select([user_table.c.id])).fetchall()[0][0])
- print(conn.execute(
- select([user_table.c.id, user_table.c.username, user_table.c.fullname]).where(
- or_(user_table.c.username == 'wendy', user_table.c.username == 'dilbert')).order_by(
- user_table.c.fullname)).fetchall())
- ```
- 1
- [(1, 'dilbert', 'Dilbert Jones')]
- ```
- # Produce this SELECT: SELECT fullname, email_address FROM user JOIN address ON user.id = address.user_id WHERE username='ed' ORDER BY email_address
- from sqlalchemy import MetaData, Table, Column, String, Integer, or_, select, ForeignKey
- from sqlalchemy import create_engine
- metadata = MetaData()
- address_table = Table("address", metadata,
- Column('id', Integer, primary_key=True),
- Column('user_id', Integer, ForeignKey('user.id'),
- nullable=False),
- Column('email_address', String(100), nullable=False)
- )
- user_table = Table('user', metadata,
- Column('id', Integer, primary_key=True),
- Column('username', String(50)),
- Column('fullname', String(50))
- )
- engine = create_engine("sqlite://")
- metadata.create_all(engine)
- conn = engine.connect()
- conn.execute(address_table.insert(), [
- {"user_id": 1, "email_address": "[email protected]"},
- {"user_id": 1, "email_address": "[email protected]"},
- {"user_id": 2, "email_address": "[email protected]"},
- {"user_id": 3, "email_address": "[email protected]"},
- ])
- conn.execute(user_table.insert(),
- [{'username': 'ed', 'fullname': 'Edward'}, {'username': 'dilbert', 'fullname': 'Dilbert Jones'}])
- print(conn.execute(
- select([user_table.c.fullname, address_table.c.email_address])
- .select_from(user_table.join(address_table))
- .where(user_table.c.username == 'ed')
- .order_by(address_table.c.email_address)
- ).fetchall())
- ```
- [('Edward', '[email protected]'), ('Edward', '[email protected]')]
- ```
- # 1. Execute this UPDATE - keep the "result" that's returned UPDATE user SET fullname='Ed Jones' where username='ed' 2. how many rows did the above statement update? 3. Tricky bonus! Combine update() along with select().as_scalar() to execute this UPDATE: UPDATE user SET fullname=fullname || (select email_address FROM address WHERE user_id=user.id) WHERE username IN ('jack', 'wendy')
- from sqlalchemy import MetaData, Table, Column, String, Integer, or_, select, ForeignKey, update, func
- from sqlalchemy import create_engine
- metadata = MetaData()
- address_table = Table("address", metadata,
- Column('id', Integer, primary_key=True),
- Column('user_id', Integer, ForeignKey('user.id'),
- nullable=False),
- Column('email_address', String(100), nullable=False)
- )
- user_table = Table('user', metadata,
- Column('id', Integer, primary_key=True),
- Column('username', String(50)),
- Column('fullname', String(50))
- )
- engine = create_engine("sqlite://")
- metadata.create_all(engine)
- conn = engine.connect()
- conn.execute(address_table.insert(), [
- {"user_id": 1, "email_address": "[email protected]"},
- {"user_id": 1, "email_address": "[email protected]"},
- {"user_id": 2, "email_address": "[email protected]"},
- {"user_id": 3, "email_address": "[email protected]"},
- ])
- conn.execute(user_table.insert(),
- [{'username': 'ed', 'fullname': 'Edward'}, {'username': 'dilbert', 'fullname': 'Dilbert Jones'}])
- result = user_table.update().values(fullname="Ed Jones").where(user_table.c.username == 'ed')
- res = conn.execute(result)
- print(res.rowcount)
- address_sel = select([
- func.count(address_table.c.id)
- ]).where(user_table.c.id == address_table.c.user_id)
- result = user_table.update().values(fullname=user_table.c.fullname + address_sel.as_scalar()).where(
- user_table.c.username.in_({'jack', 'ed'}))
- res = conn.execute(result)
- print(res.rowcount)
- ```
- 1
- 1
- ```
- # 1. Create a class/mapping for this table, call the class Network CREATE TABLE network (network_id INTEGER PRIMARY KEY, name VARCHAR(100) NOT NULL,) 2. emit Base.metadata.create_all(engine) to create the table 3. commit a few Network objects to the database: Network(name='net1'), Network(name='net2')
- ```
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Unicode, create_engine
- from sqlalchemy.orm import Session
- Base = declarative_base()
- class Network(Base):
- __tablename__ = 'network'
- network_id = Column(Integer, primary_key=True)
- name = Column(Unicode(100), nullable=False)
- def __repr__(self):
- return "<Network(%r, %r)>" % (
- self.network_id, self.name
- )
- engine = create_engine('sqlite://')
- Base.metadata.create_all(engine)
- session = Session(bind=engine)
- session.add_all([
- Network(name='net1'),
- Network(name='net2')
- ])
- session.commit()
- ```
- # 1. Produce a Query object representing the list of "fullname" values for all User objects in alphabetical order. 2. call .all() on the query to make sure it works! 3. build a second Query object from the first that also selects only User rows with the name "mary" or "ed". 4. return only the second row of the Query from #3.
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Unicode, create_engine, and_
- from sqlalchemy.orm import Session
- Base = declarative_base()
- class Network(Base):
- __tablename__ = 'network'
- network_id = Column(Integer, primary_key=True)
- name = Column(Unicode(100), nullable=False)
- def __repr__(self):
- return "<Network(%r, %r)>" % (
- self.network_id, self.name
- )
- class User(Base):
- __tablename__ = 'user'
- id = Column(Integer, primary_key=True)
- name = Column(String)
- fullname = Column(String)
- def __repr__(self):
- return "<User(%r, %r)>" % (
- self.name, self.fullname
- )
- engine = create_engine('sqlite://')
- Base.metadata.create_all(engine)
- session = Session(bind=engine)
- session.add_all([
- Network(name='net1'),
- Network(name='net2'),
- User(name='wendy', fullname='Wendy Weathersmith'),
- User(name='mary', fullname='Mary Contrary'),
- User(name='fred', fullname='Fred Flinstone'),
- User(name='ed', fullname='Edward Jones')
- ])
- session.commit()
- print(session.query(User.fullname).filter(User.name.in_(['mary', 'fred'])).order_by(User.fullname).all()[1])
- ```
- ('Mary Contrary',)
- ```
- # 1. Run this SQL JOIN: SELECT user.name, address.email_address FROM user JOIN address ON user.id=address.user_id WHERE address.email_address='[email protected]' 2. Tricky Bonus! Select all pairs of distinct user names. Hint: "... ON user_alias1.name < user_alias2.name"
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Unicode, create_engine, and_, ForeignKey
- from sqlalchemy.orm import Session, relationship, aliased
- Base = declarative_base()
- class Network(Base):
- __tablename__ = 'network'
- network_id = Column(Integer, primary_key=True)
- name = Column(Unicode(100), nullable=False)
- def __repr__(self):
- return "<Network(%r, %r)>" % (
- self.network_id, self.name
- )
- class User(Base):
- __tablename__ = 'user'
- id = Column(Integer, primary_key=True)
- name = Column(String)
- fullname = Column(String)
- def __repr__(self):
- return "<User(%r, %r)>" % (
- self.name, self.fullname
- )
- class Address(Base):
- __tablename__ = 'address'
- id = Column(Integer, primary_key=True)
- email_address = Column(String, nullable=False)
- user_id = Column(Integer, ForeignKey('user.id'))
- user = relationship("User", backref="addresses")
- def __repr__(self):
- return "<Address(%r)>" % self.email_address
- engine = create_engine('sqlite://')
- Base.metadata.create_all(engine)
- session = Session(bind=engine)
- session.add_all([
- Network(name='net1'),
- Network(name='net2'),
- User(name='wendy', fullname='Wendy Weathersmith'),
- User(name='mary', fullname='Mary Contrary'),
- User(name='fred', fullname='Fred Flinstone'),
- User(name='ed', fullname='Edward Jones'),
- Address(email_address='[email protected]', user_id=0),
- Address(email_address='[email protected]', user_id=1),
- Address(email_address='[email protected]', user_id=2)
- ])
- session.commit()
- print(session.query(User.name, Address.email_address).join(Address).filter(
- Address.email_address == '[email protected]').all())
- a1, a2 = aliased(User), aliased(User)
- print(session.query(a1.name, a2.name).outerjoin().filter(
- a1.name < a2.name).all())
- ```
- [('wendy', '[email protected]')]
- [('mary', 'wendy'), ('fred', 'wendy'), ('fred', 'mary'), ('ed', 'wendy'), ('ed', 'mary'), ('ed', 'fred')]
- ```
- # 1. Create a class called 'Account', with table "account": id = Column(Integer, primary_key=True) owner = Column(String(50), nullable=False) balance = Column(Numeric, default=0) 2. Create a class "Transaction", with table "transaction": * Integer primary key * numeric "amount" column * Integer "account_id" column with ForeignKey('account.id') 3. Add a relationship() on Transaction named "account", which refers to "Account", and has a backref called "transactions". 4. Create a database, create tables, then insert these objects: a1 = Account(owner='Jack Jones', balance=5000) a2 = Account(owner='Ed Rendell', balance=10000) Transaction(amount=500, account=a1) Transaction(amount=4500, account=a1) Transaction(amount=6000, account=a2) Transaction(amount=4000, account=a2) 5. Produce a report that shows: * account owner * account balance * summation of transaction amounts per account (should match balance) A column can be summed using func.sum(Transaction.amount)
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Unicode, create_engine, and_, ForeignKey, Numeric, func
- from sqlalchemy.orm import Session, relationship, aliased
- Base = declarative_base()
- class Account(Base):
- __tablename__ = 'account'
- id = Column(Integer, primary_key=True)
- owner = Column(String(50), nullable=False)
- balance = Column(Numeric, default=0)
- def __repr__(self):
- return "<Account(%r, %r, %r)>" % (
- self.id, self.owner, self.balance
- )
- class Transaction(Base):
- __tablename__ = 'transaction'
- id = Column(Integer, primary_key=True)
- amount = Column(Numeric)
- account_id = Column(Integer, ForeignKey(Account.id), nullable=False)
- account = relationship('Account', backref="transactions")
- engine = create_engine('sqlite://')
- Base.metadata.create_all(engine)
- session = Session(bind=engine)
- a1, a2 = Account(owner='Jack Jones', balance=5000), Account(owner='Ed Rendell', balance=10000)
- session.add_all([
- a1,
- a2,
- Transaction(amount=500, account=a1),
- Transaction(amount=4500, account=a1),
- Transaction(amount=6000, account=a2),
- Transaction(amount=4000, account=a2)
- ])
- session.commit()
- accounts = session.query(Account).all()
- for account in accounts:
- owner = account.owner
- balance = account.balance
- summ = session.query(func.sum(Transaction.amount)).filter_by(account_id=account.id).all()[0][0]
- print("owner: " + str(owner))
- print("balance: " + str(balance))
- print("summation of transaction amounts per account: " + str(summ))
- ```
- owner: Jack Jones
- balance: 5000.0000000000
- summation of transaction amounts per account: 5000.0000000000
- owner: Ed Rendell
- balance: 10000.0000000000
- summation of transaction amounts per account: 10000.0000000000
- ```
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement