Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Created by Brandon
- import psycopg2 as I_sql
- class SQL_Query:
- con = None
- noFetch = ['DELETE', 'CREATE', 'UPDATE', 'INSERT']
- def __init__(self, con):
- self.con = con
- def execute(self, sql: str):
- result = False
- cur = None
- print(sql)
- if self.con is not None:
- try:
- cur = self.con.cursor()
- cur.execute(sql)
- hasNoFetch = False
- for word in self.noFetch:
- if word in sql:
- hasNoFetch = True
- break
- if not hasNoFetch:
- result = cur.fetchall()
- except (Exception, I_sql.DatabaseError) as error:
- print("SQL Error Occured >> ")
- print(error)
- finally:
- if cur is not None:
- cur.close()
- return result
- class SQL:
- con = None
- host = ""
- username = ""
- password = ""
- databaseTarget = ""
- schemaTarget = ""
- def __init__(self):
- pass
- def connect(self, host: str, user: str, password: str, dbName = ""):
- self.host = host
- self.username = user
- self.password = password
- print("Connecting to PostgreSQL")
- try:
- self.con = I_sql.connect('user='+user+' password='+password)
- print("Connected to PostgreSQL")
- self.con.set_isolation_level(I_sql.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
- if dbName != "":
- self.useDatabase(dbName)
- except (Exception, I_sql.DatabaseError) as error:
- print("SQL Connection Error Occured >> ")
- print(error)
- return self.con
- def close(self):
- if self.con is not None:
- self.con.close()
- print("Disconnecting from PostgreSQL")
- def query(self, sql: str):
- if self.con is not None:
- if self.con.closed == 0:
- query = SQL_Query(self.con)
- return query.execute(sql)
- elif self.con.closed == 1:
- print("Failed to Execute: " + sql + "\nError: No SQL Connection.")
- return False
- def useDatabase(self, database: str):
- self.createDatabase(database)
- print("Connecting to PostgreSQL > Database > " + database)
- try:
- self.con = I_sql.connect('dbname='+ database +' user='+ self.username +' password='+ self.password)
- print("Connected to PostgreSQL > Database > " + database)
- self.con.set_isolation_level(I_sql.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
- self.databaseTarget = database
- except (Exception, I_sql.DatabaseError) as error:
- print("SQL Connection Error Occured >> ")
- print(error)
- return self.con
- def useSchema(self, schema: str):
- self.schemaTarget = schema
- return self.con
- def get(self, table: str, where: list):
- whereClasues = ' '.join(where)
- return self.query("SELECT * FROM "+ self.__targetTable(table) + ((" WHERE " + whereClasues) if len(where) != 0 else ""))
- def deleteTableRow(self, table: str, where: list):
- whereClasues = ' AND '.join(where)
- return self.query("DELETE FROM " + self.__targetTable(table) + " WHERE " + whereClasues)
- def deleteTable(self, table: str):
- return self.query("DROP TABLE IF EXISTS " + self.__targetTable(table))
- def deleteSchema(self, schema: str):
- if self.schemaTarget == schema:
- self.schemaTarget = ""
- return self.query("DROP SCHEMA IF EXISTS " + schema)
- def deleteDatabase(self, database: str):
- self.connect(self.host, self.username, self.password)
- return self.query("DROP DATABASE IF EXISTS " + database)
- def insert(self, table: str, fields: dict):
- field_keys = ', '.join(fields.keys())
- _fields = '\',\''.join(fields.values())
- return self.query("INSERT INTO " + self.__targetTable(table) + "("+ field_keys +") VALUES ('"+ _fields +"')")
- def createTable(self, name: str, fields: list):
- field_keys = ', '.join(fields)
- return self.query("CREATE TABLE IF NOT EXISTS " + self.__targetTable(name) + " ("+ field_keys + ")")
- def createSchema(self, name: str):
- if self.databaseTarget is not "":
- self.useSchema(name)
- if not self.checkIfSchemaExists(name):
- return self.query("CREATE SCHEMA " + name)
- return False
- def createDatabase(self, name: str):
- if not self.checkIfDatabaseExists(name):
- return self.query("CREATE DATABASE " + name)
- return False
- def update(self, table: str, where: list, fields: dict):
- whereClasues = ' AND '.join(where)
- _resolvedFields = []
- for key in fields.keys():
- _resolvedFields.append(key + " = '" + fields[key] + "'")
- _resolvedFieldsToStr = ', '.join(_resolvedFields)
- return self.query("UPDATE " + self.__targetTable(table) + " SET " + _resolvedFieldsToStr + ((" WHERE " + whereClasues) if len(where) != 0 else ""))
- def checkIfDatabaseExists(self, name: str):
- result = self.query(
- """
- SELECT EXISTS(
- SELECT datname FROM pg_catalog.pg_database WHERE lower(datname) = lower('"""+ name +"""')
- );
- """)
- value = str(result[0]).replace("(", "").replace(")", "").replace(",", "")
- return True if 'true' in value.lower() else False
- def checkIfSchemaExists(self, name: str):
- result = self.query(
- """
- SELECT EXISTS(
- SELECT schema_name FROM information_schema.schemata WHERE schema_name = '"""+ name +"""'
- );
- """)
- value = str(result[0]).replace("(", "").replace(")", "").replace(",", "")
- return True if 'true' in value.lower() else False
- def __targetTable(self, table: str):
- return (self.schemaTarget + "." + table) if self.schemaTarget != "" else table
- def main():
- sql = SQL()
- sql.connect("127.0.0.1", "postgres", "postgres", "google_codein")
- sql.createSchema("postgresql")
- sql.createTable("users",
- [
- "id SERIAL",
- "username TEXT",
- "password TEXT",
- "email TEXT",
- "PRIMARY KEY(id)"
- ]
- )
- sql.createTable("users_groups",
- [
- "id SERIAL",
- "user_id INT",
- "group_id INT",
- "PRIMARY KEY(id)"
- ]
- )
- sql.createTable("groups",
- [
- "id SERIAL",
- "name TEXT",
- "perms TEXT",
- "PRIMARY KEY(id)"
- ]
- )
- sql.insert("groups",
- {"name": "founder", "perms": "[\"*\"]"})
- sql.insert("groups",
- {"name": "admin", "perms": "[\"admin\", \"moderator\"]"})
- sql.insert("groups",
- {"name": "moderator", "perms": "[\"moderator\"]"})
- sql.insert("users",
- {"username": "John Doe", "password": "password123", "email": "JohnDoes@mail.com"})
- sql.insert("users_groups",
- {"user_id": "1", "group_id": "1"})
- sql.insert("users",
- {"username": "Jan Doe", "password": "pass123", "email": "jannydoey2234@mymail.com"})
- sql.update("users", ["username = 'John Doe'"], {"username" : "Danny Doe"})
- sql.deleteTableRow("users", ["username = 'Jan Doe'"])
- # SQL Results:
- # All John Doe names should be Danny Doe.
- # And No Jan Doe names.
- if __name__ == '__main__':
- main()
Add Comment
Please, Sign In to add comment