Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import psycopg2
- from settings import DB_CONNECTION
- # Functions for reading scripts
- class ScriptReader(object):
- @staticmethod
- def get_script(path, *args):
- script_file = open(path, 'rb')
- text = script_file.read().format(*args)
- script_file.close()
- return text
- # Utils for messages
- class Messages(object):
- @staticmethod
- def print_message(msg):
- print '---> {}'.format(msg)
- # Redshift functions to send and retrieve data
- class RedshiftDataManager(object):
- @staticmethod
- def execute_update(con, cur, script):
- try:
- cur.execute(script)
- con.commit()
- result = True
- except:
- con.rollback()
- result = False
- finally:
- con.close()
- return result
- @staticmethod
- def execute_query(con, cur, script):
- try:
- cur.execute(script)
- con.commit()
- result = cur.fetchall()
- except:
- con.rollback()
- result = None
- finally:
- con.close()
- return result
- @staticmethod
- def get_conn_string(db_conn):
- return "dbname='{}' port='5439' user='{}' password='{}' host='{}'".format(
- db_conn['db_name'], db_conn['db_username'], db_conn['db_password'], db_conn['db_host'])
- @staticmethod
- def create_conn(conn_string):
- return psycopg2.connect(conn_string)
- @staticmethod
- def get_conn():
- return RedshiftDataManager.create_conn(
- RedshiftDataManager.get_conn_string(DB_CONNECTION))
- @staticmethod
- def run_update(script):
- return RedshiftDataManager.execute_update(
- RedshiftDataManager.get_conn(),
- RedshiftDataManager.get_conn().cursor(),
- script)
- @staticmethod
- def run_query(script):
- return RedshiftDataManager.execute_query(
- RedshiftDataManager.get_conn(),
- RedshiftDataManager.get_conn().cursor(),
- script)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement