Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # example_python_n-gram_parser.py
- # not as tidy as it might be,
- # but you should get the general idea.
- # Define:
- # master_words, a list of master words to check n-grams against
- # LIST_OF_5GRAMS_FROM_LP
- # LIST_OF_FILENAMES (google data)
- # PATH_TO_COMPRESSED_DATA
- # PATH_TO_OUTPUT_FOLDER
- import gzip
- import time
- import csv
- import os
- import time
- from master_words import LIST_OF_FILENAMES
- from LP_ngrams import LIST_OF_5GRAMS_FROM_LP
- class get_rune_length_code():
- def __init__(self):
- self.latin_fragments = ['F', 'U', 'TH', 'O', 'R', 'C', 'G', 'W', 'H', 'N', 'I',
- 'J', 'EO', 'P', 'X', 'S', 'T', 'B', 'E', 'M', 'L', 'NG',
- 'OE', 'D', 'A', 'AE', 'Y', 'IA', 'EA']
- self.period = "."
- self.quote = "\""
- self.apostrophe = "'"
- self.colon = ":"
- self.semicolon = ";"
- self.comma = ","
- self.exclamation = "!"
- self.question = "?"
- self.bigram = ['TH', 'EO', 'NG', 'OE', 'AE', 'IA', 'IO', 'EA']
- self.trgram = 'ING'
- def translate_to_gematria(self,word):# thanks to 'solvers
- res = []
- skip = 0
- WORD = word.upper()
- for i, val in enumerate(WORD):
- if skip:
- skip -= 1
- continue
- if WORD[i:i+3] == self.trgram:
- res.append(self.trgram)
- skip += 2
- continue
- if WORD[i:i+2] in self.bigram:
- res.append(WORD[i:i+2])
- skip += 1
- continue
- res.append(val)
- return res
- def get_code(self,string): #meh
- if string == self.period:
- return self.period
- elif string == self.question:
- return self.period
- elif string == self.exclamation:
- return self.period
- elif string == self.quote:
- return self.quote
- elif string == self.apostrophe:
- return self.apostrophe
- elif string == self.colon:
- return self.comma
- elif string == self.semicolon:
- return self.comma
- elif string == self.comma:
- return self.comma
- elif string.isalpha():
- return len(self.translate_to_gematria(string))
- else:
- return -4
- def get_code_list(self,strings):# code is the 5-grams word lengths of string is,
- r = [] # and is checked against LP 5-grams
- for string in strings:
- a = self.get_code(string)
- r.append(a)
- if a == -4:
- return False
- return r
- class compressed_ngram_parser():
- def __init__(self):
- self.to_rune_latin = get_rune_length_code()
- # "_.", "_END_" are because punctuation
- # and _end_ is superflous assumming there is always a . (?)
- self.google_pos_tags = ["_NOUN","_VERB","_ADJ","_ADV","_PRON",
- "_DET","_ADP","_NUM","_CONJ","_PRT","_X","_.", "_END_"]
- self.google_pos_word ={"_NOUN_","_VERB_","_ADJ_","_ADV_",
- "_PRON_","_DET_","_ADP_","_NUM_","_CONJ_","_PRT_", "_ROOT_","_X_"}
- self.punctuation = [",", "_", ":", "!" ,"_",
- "\"", "'", "?","-", ")","(", "[","]"]
- self.master_words_set = set(master_words)
- self.master_words_set.update( self.google_pos_word)
- self.words_to_check = []
- self.code = None
- self.phrases = None
- self.valid_characters =
- set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ,.\";:' ?!")
- self.pos_code = {"_NOUN":"N","_VERB":"V","_ADJ":"J","_ADV":"D","_PRON":"P",
- "_DET" :"E","_ADP" :"A","_NUM" :"U","_CONJ":"C","_PRT" :"R","_X":"X",
- "_.":".","_MASTER":"M","_NOTMASTER":"S","_":"_"}
- def remove_google_pos_tags(self, word):
- for tag in self.google_pos_tags:
- word = word.replace(tag,'')
- return word.lower()
- def remove_google_pos_tags_2(self, word):
- for tag in self.google_pos_tags:
- word = word.replace(tag,'')
- return word
- def string_has_valid_characters(self, string):
- return set(string) <= self.valid_characters
- def string_has_google_pos_word(self, string):
- return any(a in string for a in self.google_pos_word)
- def is_valid_string(self, string):
- if self.string_has_google_pos_word(string):
- return False
- else:
- return self.string_has_valid_characters(string)
- def check_ngram(self, n_gram_phrase):
- global grams5
- phrase = self.remove_google_pos_tags(n_gram_phrase)
- if self.is_valid_string(phrase):
- self.phrase = phrase.split()
- self.code = self.to_rune_latin.get_code_list(self.phrase)
- if self.code == False:
- return False
- else:
- return self.code in grams5
- else:
- return False
- def get_data_to_keep(self, previous_line_phrase, total_count):
- phrases = previous_line_phrase.split()
- to_write = previous_line_phrase + " " + str(total_count) + " "
- to_write += ' '.join(str(e) for e in self.code)
- if self.are_in_master_words_set(self.phrase):
- to_write += ' _MASTER '
- else:
- to_write += ' _NOTMASTER '
- return to_write
- def are_in_master_words_set(self, words):
- lower_words = [x.lower() for x in words]
- return set(lower_words) < self.master_words_set
- def get_data(self, ngram_file, output):# main loop function
- count = 0
- total_count = 0
- previous_line_phrase = None
- should_keep = False
- with open(output, 'w') as output:
- # open compressed file for reading line by line
- with gzip.open(ngram_file, 'rb') as input:
- # reader object, data is tab delimited
- reader = csv.reader(input, delimiter='\t')
- for line in reader:
- #if same phrase as last, and should_keep
- if line[0] == previous_line_phrase and should_keep:
- total_count += int( line[-2] )
- # or its a first pass
- elif None == previous_line_phrase:
- should_keep = self.check_ngram(line[0])
- total_count = int( line[-2] )
- # or we have a new phrase, and maybe we
- # write the old phrase to file
- elif line[0] != previous_line_phrase:
- if should_keep:
- output.write(self.get_data_to_keep(
- previous_line_phrase,total_count))
- output.write("\n")
- should_keep = self.check_ngram(line[0])
- total_count = int( line[-2] )
- previous_line_phrase = line[0]
- count += 1
- if count == 50000:
- break
- def is_in_data_totidy(self, data, datatofind,part):
- data_all_part = [item[part] for item in data]
- try:
- position = data_all_part.index(datatofind)
- return position
- except ValueError:
- return -1
- def has_pos_tags(self, line):
- for tag in self.google_pos_tags:
- if tag in line:
- return True
- return False
- def get_tags(self,word):
- for tag in self.google_pos_tags:
- if tag in word:
- return tag
- return "_"
- def get_tags_list(self,wordlist):
- r = []
- for word in wordlist:
- r.append(self.pos_code[self.get_tags(word)])
- return r
- def get_combo_data(self,line):
- pos_tags = self.has_pos_tags(line)
- line_split = line.split()
- words = self.remove_google_pos_tags(line).split()[0:5]
- tags = self.get_tags_list(line_split[0:5])
- count = int(line_split[5])
- other_tags = line_split[6:]
- data = [words+tags,pos_tags,count,other_tags,line]
- return data
- # garbage, but it only takes a few minutes to run
- def tidy_files(self, inputfile, masteroutputfile, nonmasteroutputfile):
- counter = 0
- with open(inputfile,'r') as input,open(masteroutputfile,'w') as master,
- open(nonmasteroutputfile,'w') as nonmaster:
- for line in input:
- line_split = line.split()
- tags = self.get_tags_list(line_split[0:5])
- line_no_tags = self.remove_google_pos_tags_2(line)
- line_no_tags_split = line_no_tags.split()
- words_string = ' '.join(line_no_tags_split[0:5])
- tags_string = ' '.join(tags)
- length_string = ' '.join(line_no_tags_split[6:-1])
- master_string = self.pos_code[line_no_tags_split[-1]]
- count_string = line_no_tags_split[5]
- towrite = words_string + ' '
- towrite += tags_string + ' '
- towrite += length_string + ' '
- towrite += master_string + ' '
- towrite += count_string + '\n'
- if line_no_tags_split[-1] == '_MASTER':
- master.write(towrite)
- elif line_no_tags_split[-1] == '_NOTMASTER':
- nonmaster.write(towrite)
- counter += 1
- if counter == 5000:
- break
- from files import fns (LIST_OF_FILENAMES raw google data .gz)
- in_root = PATH_TO_COMPRESSED_DATA
- out_root = PATH_TO_OUTPUT_FOLDER
- parser = compressed_ngram_parser()
- for file in fns:
- in_file = in_root+file
- out_file = out_root+ file[0:-2]+"txt"
- print in_file
- print out_file
- t1= time.time()
- parser.get_data( in_file, out_file )
- t2= time.time()
- print fns[0]+ " " + str( (t2 - t1 ) / 60 )
- # some tidying, and minimising data_string size
- in_root = PATH_TO_OUTPUT_FOLDER
- fns = os.listdir(in_root)
- out_rootm = PATH_TO_OUTPUT_FOLDER_M
- out_rootn = PATH_TO_OUTPUT_FOLDER_N
- t1= time.time()
- for file in fns:
- inf = in_root + file
- outm = out_rootm + file[-6:-4]+ "_master.txt"
- outn = out_rootn + file[-6:-4]+ "_non_master.txt"
- parser.tidy_files(inf,outm,outn)
- print "TIME = " + str( (time.time() - t1 ) / 60 )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement