Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class DBStorageService(StorageService):
- __version__ = 1
- __dbname = None
- __dbuser = None
- __dbpwd = None
- __connection = None
- __cursor = None
- def __init__(self, dbname: str, dbuser: str, dbpwd: str) -> None:
- logging.info('Register extensions')
- psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
- psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
- logging.info('Create DBStorage object with params: dbname=' + dbname + ', dbuser=' + dbuser)
- self.__dbname = dbname
- self.__dbuser = dbuser
- self.__dbpwd = dbpwd
- super().__init__()
- psycopg2.extras.register_uuid()
- self.__connection = psycopg2.connect(database=self.__dbname, user=self.__dbuser, password=self.__dbpwd, host='127.0.0.1')
- def create(self, sql: str, data: tuple, is_return_id: bool = False) -> None:
- logging.debug('Create database object.')
- id = self.__execute_sql(sql=sql, data=data, is_return_id=is_return_id)
- self.commit()
- return id
- def get(self, sql: str, data: tuple = None) -> dict:
- logging.debug('Get database object.')
- self.__execute_sql(sql=sql, data=data)
- logging.debug("Do Fetch All")
- data = self.__cursor.fetchall()
- self.commit()
- logging.debug("Fetch size rows: " + str(len(data)))
- return data
- def update(self, sql: str, data: tuple) -> None:
- logging.debug('Update database object.')
- self.__execute_sql(sql=sql, data=data)
- self.commit()
- def delete(self, sql: str, data: tuple) -> None:
- logging.debug('Update database object.')
- self.__execute_sql(sql=sql, data=data)
- self.commit()
- def __execute_sql(self, sql: str, data: tuple = None, is_return_id: bool = False) -> Optional[str]:
- logging.debug("__ExecuteSQL method called")
- sql = sql.replace("?", "%s")
- log_txt = "\nSQL: %s\nParameters: %s " % (sql, data)
- logging.debug(log_txt)
- logging.debug("Create cursor")
- try:
- self.__cursor = self.__connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
- except InterfaceError as e:
- logging.error(e)
- self.__connection = psycopg2.connect(database=self.__dbname, user=self.__dbuser, password=self.__dbpwd)
- self.__cursor = self.__connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
- try:
- logging.debug("Execute SQL")
- self.__cursor.execute(sql, data)
- if is_return_id:
- return self.__cursor.fetchone()[0]
- except DatabaseError as err:
- logging.error("Database Error happened. Code: %s . Message: %s" % (err.pgcode, err.pgerror))
- logging.error(err)
- self.rollback()
- raise DatabaseError(err)
- def commit(self):
- logging.debug("Commit method called")
- logging.debug("Close Cursor")
- self.__cursor.close()
- logging.debug("Commit Connection")
- self.__connection.commit()
- def rollback(self):
- logging.debug("Rollback method called")
- logging.debug("Do connection rollback")
- self.__connection.rollback()
- def __del__(self):
- logging.info("__del__ method")
- if self.__cursor is not None:
- logging.debug("Cursor Close")
- self.__cursor.close()
- if self.__connection is not None:
- logging.debug("Connection Close")
- self.__connection.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement