Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import psycopg2.pool
- connection_pool = psycopg2.pool.ThreadedConnectionPool(0, 50,
- dbname='foobar',
- user='foobar',
- password='foobar',
- host=HOST,
- )
- class db_conn(object):
- def execute(self, query, params=()):
- self.cur.execute(query, params)
- def execute_fetch(self, query, params=()):
- self.cur.execute(query, params)
- return self.cur.fetchall()
- def execute_fetchone(self, query, params=()):
- self.cur.execute(query, params)
- return self.cur.fetchone()
- def execute_fetch_dict(self, query, params=()):
- self.cur.execute(query, params)
- rows = self.cur.fetchall()
- columns = [d[0] for d in self.cur.description]
- dict_rows = [dict(zip(columns, row)) for row in rows]
- return dict_rows
- def execute_fetchone_dict(self, query, params=()):
- self.cur.execute(query, params)
- row = self.cur.fetchone()
- if row:
- columns = [d[0] for d in self.cur.description]
- dict_row = dict(zip(columns, row))
- return dict_row
- def row_count(self):
- return self.cur.rowcount
- def __enter__(self):
- self.conn = connection_pool.getconn()
- self.cur = self.conn.cursor()
- return self
- def __exit__(self, type, value, traceback):
- if type is None:
- self.conn.commit()
- self.cur.close()
- self.conn.rollback()
- connection_pool.putconn(self.conn)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement