Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import psycopg2
- import pymysql
- class Database(object):
- def __init__(self, host_name, user_name, database_name, password, port):
- self.host_name = host_name
- self.user_name = user_name
- self.database_name = database_name
- self.password = password
- self.port = port
- def connection_manager(self):
- raise NotImplementedError
- def execute(self, query):
- raise NotImplementedError
- class RedShift(Database):
- def __init__(self, host_name, user_name, database_name, password, port=5439):
- super().__init__(host_name, user_name, database_name, password, port)
- def connection_manager(self, f):
- """
- Wrap function to setup and tear down a Postgres connection while
- providing a cursor object to make queries with.
- """
- def wrapper(*args, **kwargs):
- try:
- self.connection = psycopg2.connect(dbname=self.database_name,
- host=self.host_name,
- port=self.port,
- user=self.user_name,
- password=self.password)
- self.cursor = self.connection.cursor()
- result = f(*args, **kwargs)
- finally:
- self.cursor.close()
- self.connection.close()
- return result
- return wrapper
- @connection_manager
- def execute(self, query):
- self.cursor.execute(query)
- return self.cursor.fetchall()
- class MySQL(Database):
- def __init__(self, host_name, user_name, database_name, password, port=3306):
- super().__init__(host_name, user_name, database_name, password, port)
- def connection_manager(self, f):
- """
- Wrap function to setup and tear down a Postgres connection while
- providing a cursor object to make queries with.
- """
- def wrapper(*args, **kwargs):
- try:
- self.connection = pymysql.connect(db=self.database_name,
- host=self.host_name,
- port=self.port,
- user=self.user_name,
- password=self.password,
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor)
- self.cursor = self.connection.cursor()
- result = f(*args, **kwargs)
- self.connection.commit()
- finally:
- self.connection.close()
- return result
- return wrapper
- @connection_manager
- def execute(self, query):
- self.cursor.execute(query)
- return self.cursor.fetchall()
Add Comment
Please, Sign In to add comment