Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- DB HELPER module
- Just use it like:
- >>> from db import DbHelper
- >>> db = DbHelper()
- SELECT example - 1 (clear query)
- >>> query = "SELECT * FROM table"
- >>> result = db.fetchone(query)
- SELECT example - 2 (query with data)
- >>> query = "SELECT * FROM table WHERE some_column = %s"
- >>> data = "your param"
- >>> result = db.fetchone(query, data)
- If you need to fetch some results from same query
- use this case on first query:
- >>> result = db.fetchone(query, data, open_cursor=True)
- and this case on the next one:
- >>> result = db.fetchone(query, data, next_result=True)
- don' forget to close_cursor after all
- >>> db.close_cursor()
- """
- import pymysql
- import time
- import config
- import logging
- logger = logging.getLogger(__name__)
- class DbHelper:
- """
- Class help to make clean primary code
- It runs all queries, fetches, connects, disconnects, commits etc. within itself :)
- """
- def __init__(self):
- logger.debug("db: creating DbHelper instance")
- self.connection = self.connect()
- self.cursor = None
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- @staticmethod
- def connect():
- tries = 0
- while tries < 3:
- tries += 1
- try:
- connection = pymysql.connect(**config.db_settings)
- except pymysql.Error as e:
- logger.error(f"db: can't connect: \n{e}")
- time.sleep(1)
- pass
- else:
- logger.debug('db: connection established')
- return connection
- return None
- async def get_cursor(self):
- try:
- self.cursor = self.connection.cursor()
- except pymysql.Error as e:
- logger.error(f'db error with cursor open: \n{e}')
- else:
- logger.debug('db: cursor opened')
- return self.cursor
- async def close_cursor(self):
- if self.cursor:
- self.cursor.close()
- async def fetchone(self, *args, **kwargs):
- if self.connection and args:
- if self.cursor: # if cursor is opened
- if kwargs.get('next_result'): # if you don' need to run query again, just need to fetch next result
- result = self.cursor.fetchone()
- else: # if cursor is open but you need to run query again
- self.cursor.execute(*args)
- result = self.cursor.fetchone()
- elif kwargs.get('open_cursor'): # if kwarg open_cursor is True - open cursor
- cursor = await self.get_cursor()
- await cursor.execute(*args)
- result = await cursor.fetchone()
- else: # if you need to open temp cursor and close it after fetch
- with self.connection.cursor() as cursor:
- cursor.execute(*args)
- result = cursor.fetchone()
- return result
- async def fetchall(self, *args, **kwargs):
- if self.connection and args:
- with self.connection.cursor() as cursor:
- cursor.execute(*args)
- return cursor.fetchall()
- async def update(self, *args, no_commit=None, **kwargs):
- try:
- if self.connection:
- if args:
- with self.connection.cursor() as cursor:
- cursor.execute(*args)
- if no_commit:
- pass
- else:
- self.connection.commit()
- except pymysql.Error as e:
- logger.error(f'db: update failed (pysql reason) \n{e}')
- return False
- else:
- logger.debug(f'db: update successful')
- return True
- async def insert(self, *args, no_commit=None, **kwargs):
- try:
- if self.connection:
- if args:
- with self.connection.cursor() as cursor:
- cursor.execute(*args)
- if no_commit:
- pass
- else:
- self.connection.commit()
- except pymysql.Error as e:
- logger.error(f'db: insert failed (pysql reason) \n{e}')
- return False
- else:
- logger.debug(f'db: insert successful')
- return True
- async def commit(self):
- if self.connection:
- self.connection.commit()
- logger.debug(f'db: commit successful')
- async def close(self):
- if self.cursor:
- self.cursor.close()
- logger.debug(f'db: cursor closed')
- if self.connection:
- self.connection.close()
- logger.debug(f'db: connection closed')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement