Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import sqlite3
- import inspect
- import re, os, shutil
- import pandas as pd
- def defaultconfig():
- #create default config.json file
- with open('config.json', 'w') as config:
- config_data = {
- "path_to_database": "FUDB/FOLLOWUP.DB",
- "path_to_frontend": "FUDB/",
- "path_to_excels_exported_from_database": "excels exported/",
- "path_to_excels_to_be_imported_in_database": "excels to be imported/",
- "path_to_batches_unassigned": "DC BATCHES IN WORK/1 UNASSIGNED/",
- "path_to_batches_assigned": "DC BATCHES IN WORK/2 ASSIGNED/",
- "path_to_batches_tobechecked": "DC BATCHES IN WORK/3 TO BE CHECKED/",
- "path_to_batches_tbimported": "DC BATCHES IN WORK/4 TO BE IMPORTED/",
- "path_to_batches_finnished": "DC BATCHES IN WORK/5 FINISHED/",
- "path_to_batches_rejected": "DC BATCHES IN WORK/6 REJECTED/",
- "path_to_batches_instandby": "DC BATCHES IN WORK/7 IN STANDBY/",
- "path_to_batches_unrecordable": "DC BATCHES IN WORK/8 UNRECORDABLE/",
- "batch_status_options_responsible": "TO BE CHECKED, ONGOING, FILE PREPARATION, SPLIT FILE, FE MACRO PROCESSING, ISAIM IMPORTATION",
- "batch_status_options_proofreader": "TO BE IMPORTED, FINISHED, REJECTED, STANDBY, UNRECORDABLE, WAITING FOR RESPONSIBLE, CHECKING BY PROOFREADER",
- "batch_status_options_overall": "ONGOING, STANDBY, FINISHED",
- "aircraft": "A300, A300-600, A310, A320, A330, A340, A350, A380",
- "split_batch_factor": "2, 3, 4, 5, 6, 7, 8, 9",
- "generateBigID": "NO",
- "generateCustomID": "YES",
- "customIDlentgh": "6"
- }
- #Write to file
- json.dump(config_data, config)
- def get_jsonfilespath():
- #create a session json file on each call and a config.json file if not found
- try:
- __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
- configfilepath = open(os.path.join(__location__, 'config.json'))
- with open('session.json', 'w') as sessionfile:
- sessionfilepath = open(os.path.join(__location__, sessionfile.name))
- return configfilepath.name, sessionfilepath.name
- except:
- try:
- defaultconfig() #create default config.json file if not found
- time.sleep(2) #wait for the file to be created
- configfilepath = open(os.path.join(__location__, 'config.json'))
- with open('session.json', 'w') as sessionfile:
- sessionfilepath = open(os.path.join(__location__, sessionfile.name))
- return configfilepath.name, sessionfilepath.name
- except:
- return False, False
- def readjson(filepath):
- #Return a dict form a json file
- with open(filepath) as j:
- adict = json.load(j)
- return adict
- def configInfo():
- #Get db path from dbpath.json file
- configfilepath, sessionfilepath = 'config.json', 'session.json'
- try:
- return readjson(configfilepath)
- except:
- try:
- configfilepath, sessionfilepath = get_jsonfilespath()
- return readjson(configfilepath)
- except Exception as e:
- print("config.json file not found! creating config default Got: ",e)
- time.sleep(3)
- return configInfo()
- return False
- def user_session(user_working, user_password, user_rights):
- #Write current user data session
- configfilepath, sessionfilepath = 'config.json', 'session.json'
- try:
- with open(sessionfilepath, "w") as session:
- current_user_working = user_working
- current_user_password = user_password
- current_user_rights = user_rights
- user_session_data = {"current_user_working": current_user_working,
- "current_user_password": current_user_password,
- "current_user_rights": current_user_rights}
- #Write to file
- json.dump(user_session_data, session)
- except:
- False
- def sessionInfo():
- #Get curent user info from session.json file
- configfilepath, sessionfilepath = 'config.json', 'session.json'
- try:
- return readjson(sessionfilepath)
- except:
- configfilepath, sessionfilepath = get_jsonfilespath()
- return readjson(sessionfilepath)
- def appSettings():
- configfilepath, sessionfilepath = 'config.json', 'session.json'
- try:
- return readjson(configfilepath)
- except:
- configfilepath, sessionfilepath = get_jsonfilespath()
- return readjson(configfilepath)
- config = configInfo()
- session = sessionInfo()
- def connection():
- #Connect to a db and if it not exists creates one with the name given
- dbNamePath = config["path_to_database"]
- try:
- connection = sqlite3.connect(dbNamePath)
- #cursor = connection.cursor()
- return connection
- except:
- return False
- def execute_query(query, keepConn=False):
- #Execute query, commit and close query
- try:#execute query in database
- conn = connection()
- if conn == False:
- print("Connection to db failed")
- return False #if conn fails
- else:
- with conn:
- conn.execute(query)
- if keepConn:
- return True
- else:
- conn.close()
- return True
- except Exception as e:#query was not executed, Try again
- print("Got error: ",e)
- return False
- #Query for creating the followup table in db
- sql_create_table_followup = """CREATE TABLE IF NOT EXISTS `followup` (`BatchID` TEXT,
- `Aircraft` TEXT,
- `Operator` TEXT,
- `OriginalFilesName` TEXT,
- `OriginalFilesPath` TEXT,
- `FilesID` TEXT,
- `AddedDate` TEXT,
- `Responsible` TEXT,
- `Proofreader` TEXT,
- `ResponsibleStatus` TEXT,
- `ProofreaderStatus` TEXT,
- `ResponsibleComment` TEXT,
- `ProofreaderComment` TEXT,
- `OverallStatus` TEXT,
- `TotalRowsNbr` TEXT,
- `MPDTaskRowsNbr` TEXT,
- `OperatorRowsNbr` TEXT,
- `FindingsRowsNbr` TEXT,
- `ChangesLog` TEXT,
- `ImportedDateISAIM` TEXT);
- """
- #used for creation of user table
- sql_create_table_users = """CREATE TABLE IF NOT EXISTS `users` (`UserEmail` TEXT, `UserPassword` TEXT, `UserRights` TEXT, `DefaultProofreader` TEXT, PRIMARY KEY(`UserEmail`));"""
- sql_user_first_use = """INSERT INTO `users`(`UserEmail`, `UserPassword`, `UserRights`, `DefaultProofreader`) VALUES ('{}','{}','{}','{}');""".format('admin@admin.admin', 'admin', 'admin', 'admin@admin.admin')
- sql_delete_default_admin = """ DELETE FROM users WHERE UserEmail='admin@admin.admin' """
- #Query for inserting an user
- sql_insert_user = """INSERT INTO `users`(`UserEmail`, `UserPassword`, `UserRights`, `DefaultProofreader`) VALUES ('{}','{}','{}','{}');"""
- def generateCustomID(lencustomID):
- #Generate a random series of chars upper/lower + numbers
- import string, random
- upper = list(string.ascii_uppercase)
- lower = list(string.ascii_lowercase)
- numbers = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
- allcharli = upper + lower + numbers
- customIDli = []
- for char in range(lencustomID):
- customIDli.append(random.choice(allcharli))
- customID = ''.join(customIDli)
- return customID
- def generateID():
- import uuid
- from datetime import datetime
- genbigID = config["generateBigID"].strip().upper()
- gencustomID = config["generateCustomID"].strip().upper()
- lencustomID = int(config["customIDlentgh"].strip())
- #print(genbigID, gencustomID, lencustomID)
- if genbigID == "YES":
- bigID = datetime.now().strftime('%Y-%m-%d-%H-%M') +'-'+ str(uuid.uuid4())
- return bigID
- elif gencustomID == "YES":
- return generateCustomID(lencustomID)
- else:
- return generateCustomID(6)
- def listifyString(astring, delimiter=','):
- #get a list from a string
- strListifyed = astring.split(delimiter)
- astringListifyed = [s.strip() for s in strListifyed]
- return astringListifyed
- def getfilespath_from(root_path):
- #Walk thru a start path and return a list of paths to files
- import os
- allfiles = []
- for root, dirs, files in os.walk(root_path):
- for file in files:
- path_tofile = os.path.join(root, file)
- allfiles.append(path_tofile)
- return allfiles
- def current_date():
- #Get current date in day-month-year format
- from datetime import datetime
- date = datetime.now().strftime('%d-%m-%Y')
- return date
- def copytree(src, dst, symlinks=False, ignore=None):
- #Copy dirs and it's items from src to dst
- import os, shutil
- if not os.path.exists(dst):
- os.makedirs(dst)
- for item in os.listdir(src):
- s = os.path.join(src, item)
- d = os.path.join(dst, item)
- if os.path.isdir(s):
- copytree(s, d, symlinks, ignore)
- else:
- if not os.path.exists(d) or os.stat(s).st_mtime - os.stat(d).st_mtime > 1:
- shutil.copy2(s, d)
- def prepcell(cell, tolist=False):
- #Remove whitespaces/new line from the string and put words to list if needed
- cell = str(cell).strip().replace('\n', ' ') # using replace to avoid POST\n156424 > POST156424
- cellli = ''.join(cell).split(' ')
- cellli = [c.strip() for c in cellli if len(c) != 0]
- if tolist:
- return cellli
- else:
- cellstr = ' '.join(cellli)
- return cellstr
- def stringifyDF(df):
- #Clean cells and make all columns astype string
- columnsli = df.columns.tolist()
- for col in columnsli:
- df[col] = df[col].apply(lambda cell: prepcell(cell))
- return df
- def validate(args, paramdict):
- #check if not empty return a dict {idx:val}
- validateddict = {}
- for arg in args:
- if len(paramdict[arg]) != 0:
- validateddict[arg] = paramdict[arg]
- return validateddict
- def dcsinfo(dcspath):
- from lxml import etree
- file = open(dcspath)
- tree = etree.parse(file)
- sumAll = tree.xpath('//sum')
- totalRows = sum([int(s.text) for s in sumAll])
- sumMpd = tree.xpath('//mpdTask//sum')
- mpdtask = sum([int(s.text) for s in sumMpd])
- sumOp = tree.xpath('//opeTask//sum')
- optask = sum([int(s.text) for s in sumOp])
- sumFindings = tree.xpath("//finding[@activated='true']//sum")
- findings = sum([int(s.text) for s in sumFindings])
- infodcs = {"totalrows": totalRows,
- "mpdrows": mpdtask,
- "operatorrows": optask,
- "findingsrows": findings
- }
- return infodcs
- def create_table_followup():
- return execute_query(sql_create_table_followup)
- def get_dftable(table_name):
- #gets the table from the db
- conn = connection()
- query = "SELECT * FROM {}".format(table_name)
- df = pd.read_sql_query(query, conn)
- return df
- def create_table_users():
- try:
- execute_query(sql_create_table_users)
- user_df = get_dftable('users')
- rows, cols = user_df.shape
- if rows == 0:
- return execute_query(sql_user_first_use)
- elif rows > 1:
- return execute_query(sql_delete_default_admin)
- return True
- except:
- return False
- def get_usersdict(listallusers=False):
- user_dict = get_dftable('users').to_dict('list')
- if listallusers == True:
- return user_dict['UserEmail']
- else:
- return user_dict
- def check_user(username, password):
- usersdict = get_usersdict()
- for i, email in enumerate(usersdict['UserEmail']):
- try:
- user = email.split('@')[0]
- except:
- pass
- if username == user or username == email:
- password_db = usersdict['UserPassword'][i]
- if password_db == password:
- rights = usersdict['UserRights'][i]
- user_session(email, password_db, rights)
- return True
- else:
- user_session('NoEmail', 'NoPass', 'NoRights')
- return False
- else:
- user_session('NoEmail', 'NoPass', 'NoRights')
- return False
- def users_data():
- #Get from the users table all the proofreaders
- usersdf = get_dftable('users')
- usersProofs = usersdf[usersdf['UserRights'] == 'proofreader']
- proofslist = usersProofs['UserEmail'].tolist()
- context = {'proofreaderList': proofslist,
- 'UserEmail': usersdf['UserEmail'].tolist(),
- 'UserPassword': usersdf['UserPassword'].tolist(),
- 'UserRights': usersdf['UserRights'].tolist(),
- 'DefaultProofreader': usersdf['DefaultProofreader'].tolist()
- }
- return context
- def add_user(useremail, userpassword, user_right, defaultProofreader):
- if len(defaultProofreader) == 0:
- defaultProofreader = "UNASSIGNED"
- insert_user = sql_insert_user.format(useremail, userpassword, user_right, defaultProofreader)
- return execute_query(insert_user)
- def update_tbcell(table_name, coltoUpdate, colValtoUpdate, colID, rowID):
- update_batch = """UPDATE "{}" SET "{}"="{}" WHERE "{}"="{}";""".format(table_name, coltoUpdate, colValtoUpdate, colID, rowID)
- if execute_query(update_batch) == True:
- return True
- else:
- return False
- def update_user(userEmail, coltoUpdate, colValtoUpdate):
- return update_tbcell('users', coltoUpdate, colValtoUpdate, 'UserEmail', userEmail)
- def modify_user(UserEmail, UserPassword, UserRights, DefaultProofreader):
- frame = inspect.currentframe()
- args, _, _, paramdict = inspect.getargvalues(frame)
- validated = validate(args, paramdict)
- for col, val in validated.items():
- if col == 'UserEmail': continue
- if update_user(validated['UserEmail'], col, val):
- pass
- else:
- return False
- return True
- def remove_user(UserEmail):
- delete_user = """DELETE FROM users WHERE UserEmail='{}' """.format(UserEmail)
- return execute_query(delete_user)
- def viewBatches():
- df_dict = get_dftable('followup').to_dict('list')
- return df_dict
- def updateBatchOptions():
- #get batch update options depending on the user type (user/responsible, admin or proofreader)
- update_options_responsible = listifyString(config['batch_status_options_responsible'])
- update_options_proofreader = listifyString(config['batch_status_options_proofreader'])
- update_options_overall = listifyString(config['batch_status_options_overall'])
- aircraft = listifyString(config['aircraft'])
- split_batch_factor = listifyString(config['split_batch_factor'])
- allusers = get_usersdict(True)
- current_user_rights = session['current_user_rights']
- update_batch_dict = {"responsibleStatus": update_options_responsible,
- "proofreaderStatus": update_options_proofreader,
- "overallStatus": update_options_overall,
- "aircraft": aircraft,
- "splitBatch": split_batch_factor,
- "allusers": allusers,
- "disableCommentResponsible": '',
- "disableCommentProofreader": '',
- "disableCheckbox": ''
- }
- if current_user_rights == 'user':
- update_batch_dict["proofreaderStatus"] = ['You cannot change this']
- update_batch_dict["allusers"] = ['You cannot change this']
- update_batch_dict["overallStatus"] = ['You cannot change this']
- update_batch_dict["disableCommentProofreader"] = "disabled"
- update_batch_dict["disableCheckbox"] = "disabled"
- return update_batch_dict
- elif current_user_rights == 'admin' or current_user_rights == 'proofreader':
- update_batch_dict["disableCommentResponsible"] = "disabled"
- return update_batch_dict
- def tb_cols_placeholder(tableName):
- #Get the column names and make placeholders for them
- df = get_dftable(tableName)
- colsname = tuple(df.columns.tolist())
- phli = []
- for n in range(len(colsname)):
- phli.append('{}')
- emptyplaceholders = tuple(phli)
- colsphdict = {"columns": colsname, "placeholders": emptyplaceholders}
- return colsphdict
- def sql_insertDict(tableName, infoaddDict):
- columns = tuple(infoaddDict.keys())
- values = tuple(infoaddDict.values())
- sql_insert = """INSERT INTO {} {} VALUES {};"""
- insert = sql_insert.format(tableName, columns, values)
- return execute_query(insert)
- def addBatch(infoaddDict):
- infoaddDict['Responsible'] = 'UNASSIGNED'
- infoaddDict['Proofreader'] = 'UNASSIGNED'
- infoaddDict['ResponsibleStatus'] = 'UNASSIGNED'
- infoaddDict['ProofreaderStatus'] = 'UNASSIGNED'
- infoaddDict['OverallStatus'] = 'UNASSIGNED'
- return sql_insertDict('followup', infoaddDict)
- def getChangesLog(batchID):
- #get change log for batch ID as a list
- df = get_dftable('followup')
- df_batch = df[df['BatchID'] == batchID]
- return df_batch['ChangesLog'].tolist()
- def getUnassignedBatch(batchID, usertype):
- #Select an UNASSIGNED batch from the followup by batchID if found or
- #return first met UNASSIGNED batch
- df = get_dftable('followup')
- df_unassigned = df[df[usertype] == 'UNASSIGNED']
- df_batch = df_unassigned[df_unassigned['BatchID'] == batchID]
- try:
- if df_batch.shape[0] == 0:
- df_batchUnassigned = df[df[usertype] == 'UNASSIGNED'].head(1)
- selected_Unassigned = df_batchUnassigned['BatchID'].tolist()[0]
- return selected_Unassigned
- else:
- selected_BatchID = df_batch['BatchID'].tolist()[0]
- return selected_BatchID
- except Exception as e:
- #print("\n\ngetUnassignedBatch error: {}, \n df {}\n\n".format(e, df_batch.shape[0]))
- return False
- def checkAssignStatus():
- df = get_dftable('followup')
- df_assignedUser = df[df['ResponsibleStatus'] == 'ASSIGNED']
- df_assignedProof = df[df['ProofreaderStatus'] == 'ASSIGNED']
- assignedBatchesUser = df_assignedUser['BatchID'].tolist()
- assignedBatchesProof = df_assignedProof['BatchID'].tolist()
- communliBatch = list(set(assignedBatchesUser).intersection(assignedBatchesProof))
- return communliBatch
- def checkOverallStatus():
- #if responsible and proofreader status are ASSIGNED set OverallStatus to ASSIGNED
- setOverallBatchToAssignedli = checkAssignStatus()
- update_followup_overallstatus = """UPDATE followup SET OverallStatus="{}" WHERE BatchID="{}";"""
- if len(setOverallBatchToAssignedli) != 0:
- for batch in setOverallBatchToAssignedli:
- if execute_query(update_followup_overallstatus.format("ASSIGNED", batch)) == True:
- pass
- else:
- return False
- return True
- else:
- return True
- def createAssignedDirFiles(unassignedBatch):
- user = session['current_user_working']
- if re.search('@', user):
- user = user.split('@')[0]
- dir_unassigned = config['path_to_batches_unassigned']
- dir_assigned = config['path_to_batches_assigned']
- dir_frontend = config['path_to_frontend']
- dir_feli = os.listdir(dir_frontend)
- dir_feli = [f for f in dir_feli if re.search('.xlsm', f)]
- dir_feFile = [f for f in dir_feli if not re.search('BETA', f.upper())]
- dir_bidli = os.listdir(dir_unassigned)
- try:
- for biddir in dir_bidli:
- if re.search(unassignedBatch, biddir):
- #Get the unassigned and assigned folders paths
- opfile_dirunassigned = os.path.join(dir_unassigned, biddir)
- opfile_dirassigned = os.path.join(dir_assigned, str(user+'-'+biddir))
- #Make a new directory in the Assigned folder
- os.mkdir(opfile_dirassigned)
- #Copy the FE macro to the folder created
- fepathfile = os.path.join(dir_frontend, dir_feFile[0])
- shutil.copy2(fepathfile, opfile_dirassigned)
- #Create also here the OP FILE folder and copy here the files from unassigned
- opfilepath = os.path.join(opfile_dirassigned, 'OP FILE')
- os.mkdir(opfilepath)
- org_filesli = getfilespath_from(opfile_dirunassigned)
- org_files = [f for f in org_filesli if not re.search('Thumbs.db', f)]
- for file in org_files:
- shutil.copy2(file, opfilepath)
- #Rename the FE macro
- filesinassigned = os.listdir(opfile_dirassigned)
- fenameold = [f for f in filesinassigned if re.search('.xlsm', f)][0]
- fefileold = os.path.join(opfile_dirassigned, fenameold)
- fenamenew = unassignedBatch+'-'+fenameold
- fefilenew = os.path.join(opfile_dirassigned, fenamenew)
- os.rename(fefileold, fefilenew)
- return True
- except Exception as e:
- print("GOT: ", e)
- return False
- def updatedict_sq(tableName, updatedict, colIDName):
- colsvals = []
- for col, val in updatedict.items():
- if col != colIDName:
- sqval = col + '=' + "'{}'".format(val)
- colsvals.append(sqval)
- else:
- whereCol_value = col + '=' + "'{}'".format(val)
- colstoUpdate = ', '.join(colsvals)
- q = str("UPDATE " + tableName + " SET " + colstoUpdate + " WHERE " + whereCol_value + ";")
- return execute_query(q)
- def assignBatchtoUser(batchID, assignedtoProofreader):
- if checkOverallStatus() == True:
- date = current_date()
- userinfo = sessionInfo()
- responsible_user = userinfo["current_user_working"]
- tableName = 'followup'
- updatedict = {"BatchID": batchID,
- "Responsible": responsible_user,
- "ResponsibleStatus": "ASSIGNED",
- "Proofreader": responsible_user,
- "ProofreaderStatus": "ASSIGNED"
- }
- colIDName = "BatchID"
- if assignedtoProofreader == True:
- unassignedBatch = getUnassignedBatch(batchID, 'ProofreaderStatus')
- updatedict.pop("Responsible", None)
- updatedict.pop("ResponsibleStatus", None)
- changeLoglist = getChangesLog(unassignedBatch)
- loginfo = "ASSIGNED to {} on {}".format(responsible_user, date)
- updatedict["ChangesLog"] = loginfo
- if updatedict_sq(tableName, updatedict, colIDName) == True:
- print('TRUE updatedict_sq')
- checkOverallStatus()
- createAssignedDirFiles(unassignedBatch)
- return True
- elif assignedtoProofreader == False:
- unassignedBatch = getUnassignedBatch(batchID, 'ResponsibleStatus')
- updatedict.pop("Proofreader", None)
- updatedict.pop("ProofreaderStatus", None)
- changeLoglist = getChangesLog(unassignedBatch)
- loginfo = "ASSIGNED to {} on {}".format(responsible_user, date)
- updatedict["ChangesLog"] = loginfo
- if updatedict_sq(tableName, updatedict, colIDName) == True:
- print('FALSE updatedict_sq')
- checkOverallStatus()
- createAssignedDirFiles(unassignedBatch)
- return True
- else:
- return False
Add Comment
Please, Sign In to add comment