Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ########################################################################################################################
- # File db-sqlite3.py
- # Purpose: Class to make working with SQLite3 a lot easier.
- # Author: Dan Huckson, https://github.com/unodan
- ########################################################################################################################
- import sqlite3
- import logging as lg
- from os.path import isfile, getsize
- class DBSQLite3:
- def __init__(self, logName=None):
- self.host = None
- self.conn = None
- self.cursor = None
- self.dbName = None
- self.dbUser = None
- self.dbPassword = None
- self.class_name = self.__class__.__name__
- if not logName:
- logName = self.class_name + '.log'
- lg.basicConfig(filename=logName, format='%(levelname)s:%(name)s:%(asctime)s:%(message)s',
- datefmt='%Y/%m/%d %I:%M:%S', level=lg.DEBUG)
- lg.info(self.class_name + ':__init__:Object created')
- def use(self, name):
- try:
- if self.database_exist(name):
- if self.dbName != name:
- self.dbName = name
- self.conn = sqlite3.connect(self.dbName)
- self.cursor = self.conn.cursor()
- lg.info(self.class_name + ':use:Successful')
- return True
- else:
- raise ValueError('Could not use database.', name)
- except Exception as err:
- lg.error(self.class_name + ':use:' + str(err))
- return False
- def dump(self, name):
- try:
- print("TODO")
- return True
- except Exception as err:
- lg.error(self.class_name + ':dump:' + str(err))
- return False
- def close(self):
- try:
- self.cursor.close()
- self.conn.close()
- lg.info(self.class_name + ':close:Closed database:' + self.dbName)
- return True
- except Exception as err:
- lg.error(self.class_name + ':close:' + str(err))
- return False
- def commit(self):
- try:
- self.conn.commit()
- lg.info(self.class_name + ':commit:Successful')
- return True
- except Exception as err:
- lg.error(self.class_name + ':commit:' + str(err))
- return False
- def connect(self, conn):
- self.dbName = conn['database']
- try:
- self.conn = sqlite3.connect(self.dbName)
- lg.info(self.class_name + ':connect:Successful')
- self.cursor = self.conn.cursor()
- return True
- except Exception as err:
- lg.error(self.class_name + ':connect:' + str(err))
- return False
- def query(self, sql, args=None):
- try:
- if not args:
- self.cursor.execute(sql)
- else:
- self.cursor.execute(sql, args)
- lg.info(self.class_name + ':query:Successful:' + sql)
- return True
- except Exception as err:
- lg.error(self.class_name + ':query:'+str(err) + ':' + sql)
- return False
- def fetchone(self):
- try:
- row = self.cursor.fetchone()
- lg.info(self.class_name + ':fetchone:Successful')
- return row
- except Exception as err:
- lg.error(self.class_name + ':fetchone:'+str(err))
- return False
- def fetchall(self):
- try:
- rows = self.cursor.fetchall()
- lg.info(self.class_name + ':fetchall:Successful')
- return rows
- except Exception as err:
- lg.error(self.class_name + ':fetchall:'+str(err))
- return False
- def drop_table(self, name):
- try:
- print("TODO")
- return True
- except Exception as err:
- lg.error(self.class_name + ':drop_table:' + str(err) + ':' + sql)
- return False
- def create_table(self, name, sql):
- sql = 'CREATE TABLE ' + name + ' ( ' + sql + ' );'
- try:
- self.cursor.execute(sql)
- lg.info(self.class_name + ':create_table:Successful:' + sql)
- return True
- except Exception as err:
- lg.error(self.class_name + ':create_table:' + str(err) + ':' + sql)
- return False
- def create_database(self, name):
- try:
- if not isfile(name):
- open(name, 'w').close()
- lg.info(self.class_name + ':create_database:Successful:Database:' + self.dbName)
- return True
- elif not getsize(name):
- lg.info(self.class_name + ':create_database:Successful:Database:' + self.dbName)
- return True
- else:
- with open(name, 'r') as fd:
- header = fd.read(100)
- if header[:15] == 'SQLite format 3':
- raise ValueError('Database file already exists.', name)
- else:
- raise ValueError('File is not a SQLite3 database file.', name)
- except Exception as err:
- lg.error(self.class_name + ':create_database:' + str(err))
- return False
- def row_exist(self, table, id):
- sql = 'SELECT id FROM ' + table + ' WHERE id=?;'
- if self.query(sql, (id,)):
- if self.fetchone():
- return True
- else:
- return False
- def table_exist(self, name):
- sql = 'SELECT * FROM sqlite_master WHERE type="table" AND name="'+name+'";'
- try:
- self.cursor.execute(sql)
- if self.cursor.fetchone():
- return True
- else:
- return False
- except Exception as err:
- lg.error(self.class_name + ':table_exist:' + str(err))
- return False
- def database_exist(self, name):
- try:
- if not isfile(name):
- raise ValueError('File not found.', name)
- elif not getsize(name):
- return True
- elif getsize(name) < 100 and getsize(name):
- raise ValueError('1File is not a SQLite3 database file.', name)
- else:
- with open(name, 'rb') as fd:
- header = fd.read(100)
- if header[:15] == b'SQLite format 3':
- return True
- raise ValueError('2File is not a SQLite3 database file.', name)
- except Exception as err:
- lg.error(self.class_name + ':database_exist:Failed:' + str(err))
- return False
- def insert_row(self, table, row):
- parts = ''
- sql = 'INSERT INTO ' + table + ' ('
- for f in row:
- parts += (f + ',')
- sql = sql + parts[:-1] + ') VALUES ('
- parts = ''
- for f in row:
- parts += '?,'
- sql = sql + parts[:-1] + ');'
- data = []
- for c in row:
- data.append(row[c])
- try:
- self.cursor.execute(sql, tuple(data))
- lg.info(self.class_name + ':insert_row:Successful:' + sql)
- return True
- except Exception as err:
- lg.error(self.class_name + ':insert_row:'+str(err) + ':' + sql)
- return False
- def update_row(self, table, row, id):
- parts = ''
- sql = 'UPDATE ' + table + ' SET '
- for f in row:
- parts += (f + '=?,')
- sql = sql + parts[:-1] + ' WHERE id=?;'
- print(sql)
- data = []
- for c in row:
- data.append(row[c])
- data.append(id)
- try:
- self.cursor.execute(sql, tuple(data))
- lg.info(self.class_name + ':update_row:Successful:' + sql)
- return True
- except Exception as err:
- lg.error(self.class_name + ':update_row:'+str(err) + ':' + sql)
- return False
- def delete_row(self, table, id):
- sql = 'DELETE FROM ' + table + ' WHERE id = ?;'
- try:
- self.cursor.execute(sql, (id,))
- lg.info(self.class_name + ':delete_row:Successful:' + sql)
- return True
- except Exception as err:
- lg.error(self.class_name + ':delete_row:'+str(err) + ':' + sql)
- return False
- def main():
- appName = 'TestSQLite3'
- conn = {
- 'database': appName
- }
- tables = {
- 'users1': 'id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, '
- 'fName VARCHAR(30) NOT NULL, lName VARCHAR(30) NOT NULL',
- 'users2': 'id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, '
- 'fName VARCHAR(30) NOT NULL, lName VARCHAR(30) NOT NULL'
- }
- dbName = conn['database']
- db = DBSQLite3(appName + '.log')
- db.connect(conn)
- if not db.database_exist(dbName):
- db.create_database(dbName)
- if db.use(dbName):
- print('Connected to database ({})'.format(dbName))
- else:
- print('Could not connected to database ({})'.format(dbName))
- for t in tables:
- if not db.table_exist(t):
- db.create_table(t, tables[t])
- row = {'fName':'Dan', 'lName':'Huckson'}
- db.insert_row('users1', row)
- db.delete_row('users1', 7)
- row = {'fName':'Daniel', 'lName':'Huckson'}
- db.update_row('users1', row, 1)
- db.query('UPDATE users1 SET fName=?,lName=? WHERE id=?', ('Danny', 'Huckson', 3))
- if db.query('SELECT * FROM `users1`;'):
- for r in db.fetchall():
- print('ROW:', r)
- if db.row_exist('users1', 5):
- print('YES it is there')
- else:
- print('Nope not there')
- db.commit()
- db.close()
- main()
Add Comment
Please, Sign In to add comment