Guest User

Untitled

a guest
Jan 3rd, 2018
38
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.77 KB | None | 0 0
  1. ########################################################################################################################
  2. # File db-sqlite3.py
  3. # Purpose: Class to make working with SQLite3 a lot easier.
  4. #  Author: Dan Huckson, https://github.com/unodan
  5. ########################################################################################################################
  6.  
  7. import sqlite3
  8. import logging as lg
  9. from os.path import isfile, getsize
  10.  
  11.  
  12. class DBSQLite3:
  13.     def __init__(self, logName=None):
  14.         self.host = None
  15.         self.conn = None
  16.         self.cursor = None
  17.         self.dbName = None
  18.         self.dbUser = None
  19.         self.dbPassword = None
  20.         self.class_name = self.__class__.__name__
  21.  
  22.         if not logName:
  23.             logName = self.class_name + '.log'
  24.  
  25.         lg.basicConfig(filename=logName, format='%(levelname)s:%(name)s:%(asctime)s:%(message)s',
  26.             datefmt='%Y/%m/%d %I:%M:%S', level=lg.DEBUG)
  27.         lg.info(self.class_name + ':__init__:Object created')
  28.  
  29.     def use(self, name):
  30.         try:
  31.             if self.database_exist(name):
  32.                 if self.dbName != name:
  33.                     self.dbName = name
  34.                     self.conn = sqlite3.connect(self.dbName)
  35.                     self.cursor = self.conn.cursor()
  36.  
  37.                 lg.info(self.class_name + ':use:Successful')
  38.                 return True
  39.  
  40.             else:
  41.                 raise ValueError('Could not use database.', name)
  42.  
  43.         except Exception as err:
  44.             lg.error(self.class_name + ':use:' + str(err))
  45.             return False
  46.  
  47.     def dump(self, name):
  48.         try:
  49.             print("TODO")
  50.             return True
  51.  
  52.         except Exception as err:
  53.             lg.error(self.class_name + ':dump:' + str(err))
  54.             return False
  55.  
  56.     def close(self):
  57.         try:
  58.             self.cursor.close()
  59.             self.conn.close()
  60.             lg.info(self.class_name + ':close:Closed database:' + self.dbName)
  61.             return True
  62.  
  63.         except Exception as err:
  64.             lg.error(self.class_name + ':close:' + str(err))
  65.             return False
  66.  
  67.     def commit(self):
  68.         try:
  69.             self.conn.commit()
  70.             lg.info(self.class_name + ':commit:Successful')
  71.             return True
  72.  
  73.         except Exception as err:
  74.             lg.error(self.class_name + ':commit:' + str(err))
  75.             return False
  76.  
  77.     def connect(self, conn):
  78.         self.dbName = conn['database']
  79.         try:
  80.             self.conn = sqlite3.connect(self.dbName)
  81.             lg.info(self.class_name + ':connect:Successful')
  82.             self.cursor = self.conn.cursor()
  83.             return True
  84.  
  85.         except Exception as err:
  86.             lg.error(self.class_name + ':connect:' + str(err))
  87.             return False
  88.  
  89.     def query(self, sql, args=None):
  90.         try:
  91.             if not args:
  92.                 self.cursor.execute(sql)
  93.             else:
  94.                 self.cursor.execute(sql, args)
  95.  
  96.             lg.info(self.class_name + ':query:Successful:' + sql)
  97.             return True
  98.  
  99.         except Exception as err:
  100.             lg.error(self.class_name + ':query:'+str(err) + ':' + sql)
  101.             return False
  102.  
  103.     def fetchone(self):
  104.         try:
  105.             row = self.cursor.fetchone()
  106.             lg.info(self.class_name + ':fetchone:Successful')
  107.             return row
  108.  
  109.         except Exception as err:
  110.             lg.error(self.class_name + ':fetchone:'+str(err))
  111.             return False
  112.  
  113.     def fetchall(self):
  114.         try:
  115.             rows = self.cursor.fetchall()
  116.             lg.info(self.class_name + ':fetchall:Successful')
  117.             return rows
  118.  
  119.         except Exception as err:
  120.             lg.error(self.class_name + ':fetchall:'+str(err))
  121.             return False
  122.  
  123.     def drop_table(self, name):
  124.         try:
  125.             print("TODO")
  126.             return True
  127.  
  128.         except Exception as err:
  129.             lg.error(self.class_name + ':drop_table:' + str(err) + ':' + sql)
  130.             return False
  131.  
  132.     def create_table(self, name, sql):
  133.         sql = 'CREATE TABLE ' + name + ' ( ' + sql + ' );'
  134.         try:
  135.             self.cursor.execute(sql)
  136.             lg.info(self.class_name + ':create_table:Successful:' + sql)
  137.             return True
  138.  
  139.         except Exception as err:
  140.             lg.error(self.class_name + ':create_table:' + str(err) + ':' + sql)
  141.             return False
  142.  
  143.     def create_database(self, name):
  144.         try:
  145.             if not isfile(name):
  146.                 open(name, 'w').close()
  147.                 lg.info(self.class_name + ':create_database:Successful:Database:' + self.dbName)
  148.                 return True
  149.  
  150.             elif not getsize(name):
  151.                 lg.info(self.class_name + ':create_database:Successful:Database:' + self.dbName)
  152.                 return True
  153.  
  154.             else:
  155.                 with open(name, 'r') as fd:
  156.                     header = fd.read(100)
  157.  
  158.                     if header[:15] == 'SQLite format 3':
  159.                         raise ValueError('Database file already exists.', name)
  160.                     else:
  161.                         raise ValueError('File is not a SQLite3 database file.', name)
  162.  
  163.         except Exception as err:
  164.             lg.error(self.class_name + ':create_database:' + str(err))
  165.             return False
  166.  
  167.     def row_exist(self, table, id):
  168.         sql = 'SELECT id FROM ' + table + ' WHERE id=?;'
  169.         if self.query(sql, (id,)):
  170.             if self.fetchone():
  171.                 return True
  172.             else:
  173.                 return False
  174.  
  175.     def table_exist(self, name):
  176.         sql = 'SELECT * FROM sqlite_master WHERE type="table" AND name="'+name+'";'
  177.         try:
  178.             self.cursor.execute(sql)
  179.  
  180.             if self.cursor.fetchone():
  181.                 return True
  182.             else:
  183.                 return False
  184.  
  185.         except Exception as err:
  186.             lg.error(self.class_name + ':table_exist:' + str(err))
  187.             return False
  188.  
  189.     def database_exist(self, name):
  190.         try:
  191.             if not isfile(name):
  192.                 raise ValueError('File not found.', name)
  193.  
  194.             elif not getsize(name):
  195.                 return True
  196.  
  197.             elif getsize(name) < 100 and getsize(name):
  198.                 raise ValueError('1File is not a SQLite3 database file.', name)
  199.             else:
  200.                 with open(name, 'rb') as fd:
  201.                     header = fd.read(100)
  202.  
  203.                     if header[:15] == b'SQLite format 3':
  204.                         return True
  205.  
  206.             raise ValueError('2File is not a SQLite3 database file.', name)
  207.  
  208.         except Exception as err:
  209.             lg.error(self.class_name + ':database_exist:Failed:' + str(err))
  210.             return False
  211.  
  212.     def insert_row(self, table, row):
  213.         parts = ''
  214.         sql = 'INSERT INTO ' + table + ' ('
  215.         for f in row:
  216.             parts += (f + ',')
  217.         sql = sql + parts[:-1] + ') VALUES ('
  218.  
  219.         parts = ''
  220.         for f in row:
  221.             parts += '?,'
  222.         sql = sql + parts[:-1] + ');'
  223.  
  224.         data = []
  225.         for c in row:
  226.             data.append(row[c])
  227.  
  228.         try:
  229.             self.cursor.execute(sql, tuple(data))
  230.             lg.info(self.class_name + ':insert_row:Successful:' + sql)
  231.             return True
  232.  
  233.         except Exception as err:
  234.             lg.error(self.class_name + ':insert_row:'+str(err) + ':' + sql)
  235.             return False
  236.  
  237.     def update_row(self, table, row, id):
  238.         parts = ''
  239.         sql = 'UPDATE ' + table + ' SET '
  240.         for f in row:
  241.             parts += (f + '=?,')
  242.         sql = sql + parts[:-1] + ' WHERE id=?;'
  243.  
  244.         print(sql)
  245.  
  246.         data = []
  247.         for c in row:
  248.             data.append(row[c])
  249.         data.append(id)
  250.  
  251.         try:
  252.             self.cursor.execute(sql, tuple(data))
  253.             lg.info(self.class_name + ':update_row:Successful:' + sql)
  254.             return True
  255.  
  256.         except Exception as err:
  257.             lg.error(self.class_name + ':update_row:'+str(err) + ':' + sql)
  258.             return False
  259.  
  260.     def delete_row(self, table, id):
  261.         sql = 'DELETE FROM ' + table + ' WHERE id = ?;'
  262.         try:
  263.             self.cursor.execute(sql, (id,))
  264.             lg.info(self.class_name + ':delete_row:Successful:' + sql)
  265.             return True
  266.  
  267.         except Exception as err:
  268.             lg.error(self.class_name + ':delete_row:'+str(err) + ':' + sql)
  269.             return False
  270.  
  271.  
  272. def main():
  273.     appName = 'TestSQLite3'
  274.  
  275.     conn = {
  276.         'database': appName
  277.     }
  278.  
  279.     tables = {
  280.         'users1': 'id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, '
  281.                  'fName VARCHAR(30) NOT NULL, lName VARCHAR(30) NOT NULL',
  282.  
  283.         'users2': 'id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, '
  284.                  'fName VARCHAR(30) NOT NULL, lName VARCHAR(30) NOT NULL'
  285.     }
  286.  
  287.     dbName = conn['database']
  288.     db = DBSQLite3(appName + '.log')
  289.     db.connect(conn)
  290.  
  291.     if not db.database_exist(dbName):
  292.         db.create_database(dbName)
  293.  
  294.     if db.use(dbName):
  295.         print('Connected to database ({})'.format(dbName))
  296.     else:
  297.         print('Could not connected to database ({})'.format(dbName))
  298.  
  299.     for t in tables:
  300.         if not db.table_exist(t):
  301.             db.create_table(t, tables[t])
  302.  
  303.     row = {'fName':'Dan', 'lName':'Huckson'}
  304.     db.insert_row('users1', row)
  305.  
  306.     db.delete_row('users1', 7)
  307.  
  308.     row = {'fName':'Daniel', 'lName':'Huckson'}
  309.     db.update_row('users1', row, 1)
  310.  
  311.     db.query('UPDATE users1 SET fName=?,lName=? WHERE id=?', ('Danny', 'Huckson', 3))
  312.  
  313.     if db.query('SELECT * FROM `users1`;'):
  314.         for r in db.fetchall():
  315.             print('ROW:', r)
  316.  
  317.     if db.row_exist('users1', 5):
  318.         print('YES it is there')
  319.     else:
  320.         print('Nope not there')
  321.  
  322.     db.commit()
  323.     db.close()
  324.  
  325.  
  326. main()
Add Comment
Please, Sign In to add comment