Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import os
- import email
- import base64
- import binascii
- import re
- import random
- import pickle
- import traceback
- from w3lib.html import remove_tags
- from collections import Counter
- from googletrans import Translator
- import nltk
- from nltk import word_tokenize, WordNetLemmatizer
- from nltk.corpus import stopwords
- from nltk import NaiveBayesClassifier, classify
- APP_NAME = "AntiSpamFilter"
- APP_VERSION = "v1"
- MY_NAME = "Glodeanu Irina-Elena"
- ALIAS = "Avion"
- nltk.download('wordnet')
- nltk.download('punkt')
- nltk.download('stopwords')
- stoplist = stopwords.words('english')
- def doInfo(output_file):
- text = APP_NAME + "\n" + MY_NAME + "\n" + ALIAS + "\n" + APP_VERSION
- writeInFile(output_file, text)
- def writeInFile(output_file, textToWrite):
- f = open(output_file, "w+")
- f.write(textToWrite)
- f.close()
- def extractEnglishWords(sentence):
- lemmatizer = WordNetLemmatizer()
- return [lemmatizer.lemmatize(word.lower()) for word in word_tokenize(sentence) if not word in stoplist and word.isalpha()] # remove stopwords and not alphabetic strings
- def translateToEnglish(textToTranslate):
- translator = Translator()
- languageDetected = translator.detect(textToTranslate[:100]).lang
- if languageDetected != "en":
- print("found language: " + languageDetected + "\n")
- result = ""
- n = 100
- for part in [textToTranslate[i:i+n] for i in range(0, len(textToTranslate), n)]:
- print(part + "\n")
- result += translator.translate(text = part, src = languageDetected, dest = "en").text
- return result
- else:
- return textToTranslate
- def extractText(emailText):
- msg = email.message_from_string(emailText)
- emailBody = re.sub(r"http\S+", "", msg.get_payload()) # remove hyperlinks
- if not msg.get('Subject'):
- subject = ''
- else:
- subject = msg.get('Subject')
- rawEmail = emailText
- if "<html" in emailBody.lower():
- rawEmail = subject + '\n' + processHtmlMail(emailBody) # add back subject of the email
- elif is64Encoded(emailBody):
- rawEmail = subject + '\n' + process64EncodedMail(emailBody) # add back subject of the email
- return translateToEnglish(rawEmail)
- def is64Encoded(emailBody):
- if ' ' in emailBody.strip():
- return False
- try:
- base64.b64decode(emailBody)
- return True
- except binascii.Error:
- return False
- except ValueError:
- return False
- def processHtmlMail(emailText):
- return remove_tags(emailText)
- def process64EncodedMail(emailText):
- return base64.b64decode(emailText).decode('latin-1').replace("\r\n"," ").replace("\n", " ")
- def prepareEmail(email):
- return {word:True for word in extractEnglishWords(extractText(email))}
- def doCleanliness(email_folder, output_file, classifier):
- directory = os.fsencode(email_folder)
- result = ""
- for file in os.listdir(directory):
- f = open(os.path.join(directory, file), "r", encoding="latin1")
- emailToTest = prepareEmail(f.read())
- f.close()
- if classifier.classify(emailToTest) == "cln":
- result = result + file.decode("utf-8") + "|cln\n"
- else:
- result = result + file.decode("utf-8") + "|inf\n"
- writeInFile(output_file, result.strip())
- def readEmails(path):
- emails = []
- fileList = os.listdir(path)
- for file in fileList:
- f = open(path + file, "r", encoding="latin1")
- emails.append(f.read())
- f.close()
- return emails
- def getTrainingData():
- cleanEmails = readEmails("/Users/dan.nastasa/Projects/antiSpamMalw/Lot1_/Lot1/Clean/")
- spamEmails = readEmails("/Users/dan.nastasa/Projects/antiSpamMalw/Lot1_/Lot1/Spam/")
- allEmails = [(email, 'cln') for email in cleanEmails]
- allEmails += [(email, 'inf') for email in spamEmails]
- random.shuffle(allEmails)
- return [(prepareEmail(email), result) for (email, result) in allEmails]
- def trainClassifier():
- trainingData = getTrainingData()
- print("Extracted features")
- classifier = NaiveBayesClassifier.train(trainingData)
- print("Finished training")
- return classifier
- def getClassifier():
- fn = os.path.join(os.path.dirname(__file__), 'classifier-l1-l2.pickle')
- if os.path.exists(fn):
- print("Classifier already exists")
- f = open(fn, "rb")
- classifier = pickle.load(f)
- f.close()
- else:
- print("Classifier does not exist. Training from scratch.")
- classifier = trainClassifier()
- f = open(fn, "wb")
- pickle.dump(classifier, f)
- f.close()
- return classifier
- if __name__ == '__main__':
- try:
- if str(sys.argv[1]) == "-info" and len(sys.argv) == 3:
- print(str(sys.argv[1]) + str(sys.argv[2]))
- doInfo(sys.argv[2])
- elif str(sys.argv[1]) == "-scan" and len(sys.argv) == 4:
- classifier = getClassifier()
- doCleanliness(sys.argv[2], sys.argv[3], classifier)
- else:
- print("wrong command")
- except:
- print(traceback.print_exc())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement