Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # use execute like:
- for row in execute(conn, 'SELECT n FROM foo WHERE n = %(n)s', n=9):
- print(row['n'])
- def yielder(cur):
- cols = [col.name for col in cur.description or []]
- while True:
- row = cur.fetchone()
- if row is not None:
- yield dict(zip(cols, row))
- else:
- break
- cur.close()
- def execute(conn, sql, **kwargs):
- cur = conn.cursor()
- cur.execute(sql, kwargs)
- return yielder(cur)
- def named(s: str):
- return f'%({s})s'
- def execute_values(conn, sql, values, columns, **kwargs):
- cur = conn.cursor()
- template = f'({",".join(named(c) for c in columns)})'
- value_strs = ','.join(
- cur.mogrify(template, value).decode('utf-8') for value in values)
- if value_strs:
- cur.execute(sql.format(values=value_strs), **kwargs)
- return yielder(cur)
- else:
- cur.close()
- return iter([])
- # and the pytest fixtures
- @pytest.fixture(scope='session')
- def original_conn():
- with testing.postgresql.Postgresql(port=8421) as postgresql:
- print(f'connecting to: {postgresql.url()}')
- conn_ = psycopg2.connect(postgresql.url())
- yield conn_
- conn_.close()
- @pytest.fixture()
- def conn(original_conn):
- with original_conn:
- db.execute(original_conn, db.create_tables_sql)
- yield original_conn
- drop_schema = 'DROP SCHEMA public CASCADE'
- create_schema = 'CREATE SCHEMA public'
- with original_conn:
- db.execute(original_conn, drop_schema)
- db.execute(original_conn, create_schema)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement