Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # EDU-MARKER - TEACHER MODULE
- # DANIEL PERKS - 175038
- # Requirements:
- # nltk.download('all')
- # Imports - Importing required modules
- import sqlite3 # Used for database structuring and read
- import socket # Used for peer-to-peer connection via LAN
- import sys # Used for a few system functions, mainly sys.exit - should remove
- import datetime # Used to gather and print the current date/time for log
- import threading # Used to thread a function for accepting students
- import time # Used for its .sleep function to add pauses to code
- import random # Used to randomly generate the port/password for tests
- import getpass # Gets the users Windows username
- import nltk
- from nltk.downloader import Downloader # Used for word analysis
- import urllib # Used to parse web URLs
- import requests # Used to retrieve a webpages content
- import wikipedia # Used to query wikipedia
- import threading # Used to thread multiple fucntions at the same time
- #from fuzzy import start # The file containing the fuzzy logic marking programm
- from bs4 import BeautifulSoup # Used to parse and extract data from website requests
- from easygui import * # Used for the GUI
- #from fuzzy import mark as fuzzy
- from tkinter import Tk
- from contextlib import contextmanager
- @contextmanager
- def tk(timeout=5):
- root = Tk() # default root
- root.withdraw() # remove from the screen
- # destroy all widgets in `timeout` seconds
- func_id = root.after(int(1000*timeout), root.quit)
- try:
- yield root
- finally: # cleanup
- root.after_cancel(func_id) # cancel callback
- root.destroy()
- class Encoder:
- @staticmethod
- def decoder(string):
- try:
- newstring = string[1:]
- newstring = bytearray.fromhex(newstring).decode()
- except (ValueError, TypeError):
- newstring = string
- return newstring
- @staticmethod
- def encoder(string):
- string = "T" + (string.encode('utf-8')).hex()
- return string
- class Network: # Selection of networking tools
- def __init__(self):
- self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.host = str(socket.gethostbyname(socket.gethostname()))
- self.port = random.randint(9999, 50000)
- self.names = {}
- self.server.bind((self.host, self.port))
- self.clients = {}
- self.users = []
- self.portOpen = 1
- def start(self, test, group): # Starts a server on the local IP with a random port
- roomno = self.host.split(".")
- roomno = roomno[2] + "." + roomno[3]
- #roomno = ".".join(roomno)
- msg = "You are running the test named: " + \
- test.capitalize() + "\nYou are running this for class: " + \
- group + "\n\nStudents should use these login details:" \
- "\nRoom Number: " + str(roomno) + "\nPassword: " + str(self.port) + "\n\n\n" \
- "Press start to start accepting students..."
- # todo Make the live student update
- clr()
- print("Room Number: " + roomno + " Password: " + str(self.port))
- print("Press enter to start the test once all students are connected.")
- print("IP: " + str(self.host))
- print("\nConnected Students: ")
- msgbox(msg, "EduMarker - Starting...", ok_button="Start")
- self.server.listen(3)
- acceptThread = threading.Thread(target=self.accept)
- acceptThread.start()
- input()
- self.portOpen = 0
- questions = []
- databasedata = database.read_questions(database.testsDB.cursor(), test)
- for question in databasedata:
- questions.append(question[1])
- send = "startx " + str(questions)
- send = send.encode()
- for user in self.users:
- conn = self.clients[user]
- conn.sendall(send)
- answers = self.recieveAnswers()
- name = str(group) + "-" + str(test)
- database.saveAnswers(name,answers)
- msgbox("All students have completed the test, and their results have been recorded.\n"
- "Please use the 'Mark a Test' option on the main menu to mark this test\n\n"
- "You will now be sent back to the main menu",
- "EduMarker - Test Complete", "Continue")
- main.home()
- return
- def recieveAnswers(self):
- clients = self.clients
- users = self.users
- answers = []
- while True:
- for user in users:
- conn = clients[user]
- answer = ""
- try:
- answer = (conn.recv(1024)).decode()
- except ConnectionResetError:
- msgbox("Student " + str(self.names[user]) + " has left the room.")
- if answer != None and answer != "":
- #print("Data: " + str(answer))
- answers.append(answer)
- # answer = answer[1:-1].split(",")
- # user = (answer[0])[1:-1]
- # question = (answer[1])[1:-1]
- # answer = (answer[2])[1:-1]
- # print(user)
- # print(question)
- # print(answer)
- # print("")
- else:
- clients.pop(user)
- users.pop(users.index(user))
- if len(users) == 0:
- return answers
- def accept(self): # Accepts clients for an open server
- conn = ""
- add = ""
- if self.portOpen == 1:
- conn, add = self.server.accept()
- if self.portOpen == 1:
- user = (conn.recv(1024)).decode()
- user = user.split(";")
- username = user[0]
- fullname = user[1]
- statement = ('Student ' + fullname + ' connected from user ' + username)
- print(statement)
- self.users.append(username)
- self.names[username] = fullname
- self.clients[username] = conn
- conn.sendall(b'Connected')
- acceptThread = threading.Thread(target=self.accept)
- acceptThread.start()
- class Data: # The tools to send and receive questions and answers
- def __init__(self):
- self.students = []
- self.questions = []
- self.answers = []
- self.testReg = ""
- self.testCon = ""
- self.groupNo = 0
- try:
- self.testsDB = sqlite3.connect('questions.db')
- self.resultsDB = sqlite3.connect('results.db')
- except sqlite3.DatabaseError:
- msg = "Unfortunately, a quiz cannot be started at this time.\n" \
- "This is likely because the program is already in use on this computer\n" \
- "Please close any programs using the file and try again\nDo you want to try again?"
- if ynbox(msg, "EduMarker - Error") is True:
- main.home()
- def commit(self):
- self.testsDB.commit()
- self.resultsDB.commit()
- def __del__(self):
- self.commit()
- def read_questions_start(self): # reads the questions from the file
- cur = self.testsDB.cursor()
- cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
- quizoptions = cur.fetchall()
- quizoptionsconv = []
- for test in quizoptions:
- if test != ('sqlite_sequence',):
- test = Encoder.decoder(str(test)[2:-3])
- quizoptionsconv.append(test)
- if not quizoptionsconv:
- msgbox("There are no tests available right now.\nMake one from the main menu.")
- main.home()
- else:
- quizoptionsconv.append("\n")
- quizoptionsconv.append("Make a new test...")
- values = choicebox("What test would you like to start?", "EduMark - Setup", quizoptionsconv)
- if values == "\n" or values == None or values.strip() == "":
- if ynbox("Would you like to cancel the test?", "EduMarker - Exit") is True:
- main.home()
- else:
- self.read_questions_start()
- else:
- index = quizoptionsconv.index(values)
- values = quizoptions[index][0]
- self.testReg = values
- self.testCon = Encoder.decoder(self.testReg)
- try:
- cur = self.testsDB.cursor()
- cur.execute("SELECT * FROM " + values)
- self.questions = cur.fetchall()
- except sqlite3.OperationalError:
- msg = "Unfortunately, there is not a test with this name. Please try again..."
- msgbox(msg, "EduMarker - Error")
- self.read_questions_start()
- else:
- group = multenterbox("Please enter the class taking this test...", "EduMark - Setup", ["Class"])
- if group == None:
- if ynbox("Are you sure you want to cancel the test?", "EduMarker - Quit") is True:
- main.home()
- return
- else:
- self.read_questions_start()
- if any([letter == "-" for letter in group[0]]) is True:
- msgbox("You used an invalid character: '-'\nPlease choose a name without this character.")
- self.read_questions_start()
- return
- if group is not None:
- return group[0]
- else:
- if ynbox("Would you like to exit the test creator?", "EduMarker - Exit") is True:
- main.home()
- def read_questions(self, db, test):
- cursor = db
- test = Encoder.encoder(test)
- cursor.execute("SELECT * FROM " + test)
- data = cursor.fetchall()
- return data
- def read_tables(self, db):
- cursor = db.cursor()
- cursor.execute("SELECT name FROM sqlite_master WHERE type is 'table'")
- data = cursor.fetchall()
- return data
- def read_data(self, db, table):
- cursor = db.cursor()
- cursor.execute("SELECT * FROM " + table)
- data = cursor.fetchall()
- return data
- def saveAnswers(self,name,answers):
- cursor = self.resultsDB.cursor()
- print(answers)
- data = {}
- for k in answers:
- k = eval(k)
- try:
- data[k[0]] += [(int(k[1]), k[2])]
- except KeyError:
- data[k[0]] = [(int(k[1]), k[2])]
- # data = str(data)
- name = encoder.encoder(name)
- cursor.execute("CREATE TABLE " + name + " (`studName` TEXT,`qNo` INTEGER,`qAns` TEXT,"
- "PRIMARY KEY(`studName`,`qNo`));")
- for stud in data:
- for k in data[stud]:
- cursor.execute("INSERT INTO " + str(name) + "('studName', 'qNo', 'qAns') VALUES ("''+ str(stud) + "'," + str(k[0]) + ",'" + str(k[1]) + "');")
- class Editor:
- def __init__(self):
- self.test = None
- self.testid = 0
- self.testen = ""
- def create(self):
- cursor = database.testsDB.cursor()
- title = "EduMark - Test Creation"
- name = (multenterbox("What is the name of this test?", title, ["Test Name"]))
- if name is None:
- if ynbox("Would you like to exit the test creator?", "EduMarker - Exit") is True:
- main.home()
- else:
- self.create()
- sys.exit()
- elif name == "":
- msgbox("You did not enter a test name\n Try again...")
- self.create()
- elif any([letter == "-" for letter in name]) is True:
- msgbox("You used an invalid character: '-'\nPlease choose a name without this character.")
- self.create()
- return
- else:
- name = Encoder.encoder(name[0])
- try:
- cursor.execute("CREATE TABLE " + name + " (`QID` INTEGER PRIMARY KEY UNIQUE,"
- "`Question` TEXT, `Answer` TEXT );")
- except sqlite3.OperationalError:
- msgbox("This test already exsists.\nPlease use a different name or edit the exsisting test.")
- self.create()
- questions = None
- while questions is None:
- questions = (multenterbox("How many questions will be in this test?"
- "", title, ["Number of Questions"]))[0]
- if questions is None:
- if ynbox("You did not enter a number of questions.\nDo you want to quit the test creator?",
- "EduMarker - Quit") is True:
- main.home()
- else:
- self.create()
- if questions.isdigit() is False:
- if ynbox("You did not enter a number of questions.\nDo you want to quit the test creator?",
- "EduMarker - Quit") is True:
- main.home()
- else:
- self.create()
- elif int(questions[0]) < 0:
- if ynbox("You did not enter a valid number of questions.\nDo you want to quit the test creator?",
- "EduMarker - Quit") is True:
- main.home()
- else:
- self.create()
- else:
- questions = questions[0].lower()
- for questionNo in range(int(questions)):
- entry = None
- while entry is None:
- entry = multenterbox("Enter question " + str(questionNo+1) +
- "'s details:", title, ["Question", "Answer(s)"])
- if entry is None:
- if ynbox("Do you want to quit the test editor?", "EduMarker - Quit") is True:
- main.home()
- if entry != None: # Does not work with 'is' like suggested
- exsisting = database.read_tables(database.testsDB)
- if name in exsisting:
- msgbox("A test with this name already exsists.\n"
- "Please pick a different name or edit the exsisting test.")
- self.create()
- else:
- cursor.execute("INSERT INTO " + name + " ('Question', 'Answer') "
- "VALUES ('" + entry[0] + "', '" + entry[1] + "')")
- database.commit()
- else:
- msgbox("You did not enter a question and answer for this test\nPlease try again...")
- msgbox("Your test has been made successfully\nYou will now go back to the main menu",
- "EduMarker - Test Created", ok_button="Continue")
- main.home()
- def editstart(self):
- msg = "Pick which test you want to edit?"
- title = "EduMark - Test Editor"
- choices = database.read_tables(database.testsDB)
- choices_new = []
- self.test = None
- for loop in range(len(choices)):
- choice = choices[loop][0]
- if choice != "sqlite_sequence":
- choice = encoder.decoder(str(choice))
- choices_new.append(choice)
- if not choices_new:
- msgbox("There are no tests available.\nPlease create one from the main menu", "EduMarker - Error")
- main.home()
- elif len(choices_new) is 1:
- self.test = choices_new[0]
- msgbox("There is only one test available now\nTherefore you will now begin editing this test"
- "\n\nTest name: " + str(self.test))
- else:
- self.test = (choicebox(msg, title, choices_new))
- if self.test is None:
- if ynbox("You did not pick a test.\nDo you want to quit the test editor?",
- "EduMarker - Quit") is True:
- main.home()
- else:
- self.editstart()
- self.testid = choices_new.index(self.test)
- self.edit()
- def edit(self):
- db = database.testsDB.cursor()
- questions = database.read_questions(db, self.test)
- questions_new = []
- for question in questions:
- questions_new.append(str(question[1]) + " - " + str(question[2]))
- if questions_new is []:
- questions_new.append("No questions added, click here to add one!")
- if len(questions_new) == 1:
- questions_new.extend(["", "Add new question...", "Delete this test..."])
- else:
- questions_new.extend(["", "Add new question...", "Delete a question...", "Delete this test..."])
- question = ""
- while question == "":
- question = choicebox("Which question would you like to edit?", "EduMarker - Edit Test", questions_new)
- if question is None:
- if ynbox("Are you sure you want to go back to the main menu?", "EduMarker - Quit") is True:
- main.home()
- return
- else:
- self.edit()
- return
- quest = question.split(" - ")
- if question == "Add new question..." or question == "No questions added, click here to add one!":
- self.addquestion(self.test)
- elif question == "Delete a question...":
- self.deleteq(self.test)
- elif question == "Delete this test...":
- self.deletet(self.test)
- else:
- sql = None
- new_question = multenterbox("Please enter the new question and/or answers...",
- "EduMarker - Edit Test", ["Question", "Answer"], [quest[0], quest[1]])
- if self.test[0] != "T":
- self.testen = Encoder.encoder(self.test)
- questid = str(questions[questions_new.index(question)][0])
- if new_question is None:
- if ynbox("Are you sure you want to leave this unedited?", "EduMarker - Close") is True:
- main.home()
- return
- else:
- self.edit()
- return
- if new_question[0] is quest[0] and new_question[1] is quest[1]:
- msgbox("You did not change either the question or answer\n"
- "Try again or cancel by closing the window.")
- elif new_question[0] is quest[0]:
- sql = "UPDATE " + str(self.testen) + " SET answer ='" + \
- str(new_question[1]) + "' WHERE QID = " + questid
- elif new_question[1] is quest[1]:
- sql = "UPDATE " + str(self.testen) + " SET question='" + \
- str(new_question[0]) + "' WHERE QID = " + questid
- elif new_question[0] is None and new_question[1] is None:
- msgbox("Both the question and answer box were empty\n"
- "Try again or cancel by closing the window.")
- self.edit()
- else:
- sql = "UPDATE " + str(self.testen) + " SET question='" + \
- str(new_question[0]) + "', answer ='" + \
- str(new_question[1]) + "' WHERE QID = " + questid
- db.execute(sql)
- database.commit()
- self.edit()
- def addquestion(self, table):
- cursor = database.testsDB.cursor()
- entry = multenterbox("Enter a new question:", "EduMarker - Add Question", ["Question", "Answer(s)"])
- if entry is None:
- if ynbox("You didn't enter anything.\nDid you want to quit?"):
- main.home()
- return
- else:
- self.edit()
- return
- table = Encoder.encoder(table)
- statement = "INSERT INTO " + table + " ('Question', 'Answer') " \
- "VALUES ('" + entry[0] + "', '" + entry[1] + "')"
- cursor.execute(statement)
- database.commit()
- self.edit()
- def deleteq(self, test):
- question = None
- while question is None:
- db = database.testsDB.cursor()
- questions = database.read_questions(db, self.test)
- questions_new = []
- for item in questions:
- item = (item[1] + " - " + item[2])
- questions_new.append(item)
- question = choicebox("Which question would you like to delete?", "EduMark - Delete Question", questions_new)
- if question is None:
- if ynbox("You didn't choice a question to delete.\nDo you still want to delete a question?"):
- self.deleteq(self.test)
- return
- else:
- main.home()
- return
- sql = "DELETE FROM " + Encoder.encoder(test) + " WHERE QID='" + \
- str(questions[questions_new.index(question)][0]) + "';"
- db.execute(sql)
- database.commit()
- self.edit()
- def deletet(self, table):
- cursor = database.testsDB.cursor()
- table = Encoder.encoder(table)
- sql = "DROP TABLE IF EXISTS " + table
- if ynbox("Are you sure you want to delete this table?", "EduMarker - Confirm Delete"):
- cursor.execute(sql)
- database.commit()
- self.editstart()
- class Marking:
- def markAnswer(self, results, testname, fullname):
- answers = {}
- questions = database.read_questions(database.testsDB.cursor(), testname)
- for i in results:
- try:
- answers[i[0]][i[1]] = i[2]
- except KeyError:
- answers[i[0]] = {i[1]: i[2]}
- print(questions)
- print(answers)
- def chooseTest(self):
- classes = database.read_tables(database.resultsDB)
- newclasses = [encoder.decoder(element[0]) for element in classes]
- newclasses = [element.split("-")[0] for element in newclasses]
- newclasses = list(set(newclasses))
- newclasses.append("")
- newclasses.append("Create A New Class")
- testClass = choicebox("Choose which Class you want to mark...","EduMarker - Choose Class",newclasses)
- while testClass is "":
- testClass = choicebox("Choose which Class you want to mark...","EduMarker - Choose Class",newclasses)
- if testClass == "Create A New Class":
- main.home()
- testClass = classes[newclasses.index(testClass)][0]
- tests = [element[0] for element in classes]
- testsEn = []
- testsDe = []
- testNamesDe = []
- for element in tests:
- if testClass in element:
- testsEn.append(element)
- testsDe.append(Encoder.decoder(element))
- testNamesDe.append((Encoder.decoder(element)).split("-")[1])
- testsPlus = testNamesDe.copy()
- testsPlus.append("")
- testsPlus.append("Start or Create A New Test")
- test = choicebox("Choose which Test you want to mark...","EduMarker - Choose Test",testsPlus)
- while test is "":
- test = choicebox("Choose which Test you want to mark...","EduMarker - Choose Test",testsPlus)
- if test == "Start or Create A New Test":
- main.home()
- else:
- fullName = testsDe[testNamesDe.index(test)]
- testName = test
- results = database.read_data(database.resultsDB, testClass)
- return results, testName, fullName
- def getWebResults(self,query,marks,level):
- pass
- class MainMenu:
- def __init__(self):
- self.user = getpass.getuser()
- def home(self):
- log("Main menu accessed")
- msg = "Welcome to EduMarker, " + self.user + "!\nWhat would you like to do?"
- title = "EduMarker - Main Menu"
- choices = ["Start A Test - Begin a existing test for an existing class",
- "Mark A Test - Mark a completed test for a class", "",
- "Create a Test - Create a new test from scratch",
- "Edit a Test - Edit the Questions and Answers of an existing test",
- "", "Exit the program"]
- choice = choicebox(msg, title, choices)
- if choice == choices[0]: # Start a test
- log("Test Setup Begun")
- group = database.read_questions_start() # Read the Questions
- network.start(database.testCon, group) # Start the test
- elif choice == choices[1]: # Mark a test
- log("Marking Begun")
- a, b, c = marking.chooseTest()
- marking.markAnswer(a,b,c)
- elif choice == choices[3]: # Create a test
- log("Test Creation Begun")
- editor.create()
- elif choice == choices[4]: # Edit a test
- log("Test Editing Begun")
- editor.editstart()
- elif choice == choices[6]: # Exit the program
- msg = "Are you sure you want to exit?"
- if ynbox(msg, "EduMarker - Close") is True:
- log("Program Closed via Main Menu option")
- sys.exit()
- else:
- self.home()
- elif choice is None:
- log("Program Closed by Close Button")
- sys.exit()
- elif choice == "":
- self.home()
- else:
- msgbox("Honestly, I have no idea how you got here.\nThis is technically unreachable", "The Impossible Error")
- def clr():
- print("\n"*80) # Todo remove if unused at end
- def log(statement):
- logfile = open("log.txt","a")
- timeRec = datetime.datetime.now()
- string = "\n- " + str(timeRec) + " - " + str(statement)
- logfile.write(string)
- logfile.close()
- def startEduMark():
- logfile = open("log.txt", "a")
- timeRec = datetime.datetime.now()
- string = "\n- " + str(timeRec) + " - NEW INSTANCE OF PROGRAM STARTED"
- logfile.write(string)
- logfile.close()
- #marking.chooseTest()
- #try:
- main.home()
- #except Exception as e:
- #msgbox("An error occured, please try again...","EduMarker - Error")
- #log("ERROR: Crash occured: " + str(e))
- # TODO PEP8
- def download():
- nltk.download('punkt')
- nltk.download('averaged_perceptron_tagger')
- network = Network()
- database = Data()
- editor = Editor()
- marking = Marking()
- encoder = Encoder()
- main = MainMenu()
- d = Downloader()
- if not d.is_installed('punkt') or not d.is_installed('averaged_perceptron_tagger'):
- p1 = threading.Thread(target=download, daemon=True)
- p1.start()
- msgbox(" PERFORMING A ONE TIME INITAL SETUP\n ------------------------------------------------\n\nPlease wait while some packages are downloaded that are required for this program to run.\nThe download should not take more than a few minutes.\n\nYou can close this popupm, the program will start when the download is complete.")
- while True:
- if p1.is_alive() is False:
- startEduMark()
- break
- else:
- startEduMark()
- # ORDER OF RUNNING #
- # Retrieve Q&As
- # Start server
- # Connect students
- # Run test
- # todo add exits for all easygui
- # todo fix missing test when starting one
Add Comment
Please, Sign In to add comment