Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from time import time
- from time import sleep
- import multiprocessing as mp
- from corpus_cleaner import CorpusCleaner
- from time import time
- import os
- import json
- import glob
- import sys
- import gc
- import pickle
- CONV_THR = 0.8
- def pool_init():
- import gc
- gc.collect()
- class LDACollapsedGibbs:
- def __init__(self, docs, K, V, voca, n_iter, alpha, beta):
- self.K = K
- self.V = V
- self.alpha = alpha
- self.beta = beta
- self.docs = docs
- self.voca = voca
- self.n_iter = n_iter
- self.start = None
- self.end = None
- self.m_z = np.zeros((len(self.docs), K)) # docs vs topics
- self.z_t = np.zeros((K, V)) + self.beta # topics vs vocab
- self.n_z = np.zeros(K) # words count in topics
- self.m_t = [] # docsv vs terms
- # self.m_z = sparse.csr_matrix(self.m_z)
- # self.z_t = sparse.csr_matrix(self.z_t)
- # self.n_z = sparse.csr_matrix(self.n_z)
- theta = np.random.dirichlet([self.alpha] * self.K, 1)
- theta = theta[0, :]
- for m, doc in enumerate(docs):
- pr = []
- for t in doc:
- # p_z = (self.z_t[:,t]+self.beta)*(curr_m_z+self.alpha)/(self.n_z+ self.V*self.beta)
- # z = np.random.multinomial(1,p_z/p_z.sum()).argmax()
- z = np.random.multinomial(1, theta / theta.sum()).argmax()
- self.m_z[m, z] += 1
- self.z_t[z, t] += 1
- self.n_z[z] += 1
- pr.append(z)
- self.m_t.append(np.array(pr))
- self.learning()
- def worddist(self):
- """get topic-word distribution"""
- return self.z_t / self.n_z[:, np.newaxis]
- def inference(self):
- for m, doc in enumerate(self.docs):
- curr_m_z = self.m_z[m]
- curr_m_t = self.m_t[m]
- for n, t in enumerate(doc):
- z = curr_m_t[n]
- curr_m_z[z] -= 1
- self.n_z[z] -= 1
- self.z_t[z, t] -= 1
- # # p(topic/doc)*p(word/topic)
- p_z = (self.z_t[:, t]) * (curr_m_z + self.alpha) / (self.n_z + self.V)
- #
- # # p_z = (self.z_t[:,t]+self.beta)*(curr_m_z+self.alpha)/(self.n_z+self.V*self.beta)
- new_z = np.random.multinomial(1, p_z / p_z.sum()).argmax()
- #
- new_z = 2
- curr_m_t[n] = new_z
- curr_m_z[new_z] += 1
- self.n_z[new_z] += 1
- self.z_t[new_z, t] += 1
- def merge_topic(self):
- to_be_merged_topic = self.n_z.argmin()
- def is_converged(self):
- row_sum = self.z_t.sum(axis=1)
- norm_z_t = self.z_t / row_sum[:, np.newaxis]
- corr_matrix = np.corrcoef(norm_z_t)
- det = np.linalg.det(corr_matrix)
- print "DET: ", str(det)
- if det >= CONV_THR:
- return True
- return False
- # self.n_z[np.where((self.n_z-self.n_z.mean()) >self.n_z.std())]
- def get_word_topic_dist(self):
- topics = []
- zcount = np.zeros(self.K, dtype=int)
- wordcount = [dict() for k in range(self.K)]
- for xlist, zlist in zip(self.docs, self.m_t):
- for x, z in zip(xlist, zlist):
- zcount[z] += 1
- if x in wordcount[z]:
- wordcount[z][x] += 1
- else:
- wordcount[z][x] = 1
- phi = self.worddist()
- for k in range(self.K):
- words = []
- for w in np.argsort(-phi[k]):
- words.append(self.voca[w])
- topics.append(words)
- return topics
- def print_word_topic_dist(self, num_words=10):
- zcount = np.zeros(self.K, dtype=int)
- wordcount = [dict() for k in range(self.K)]
- for xlist, zlist in zip(self.docs, self.m_t):
- for x, z in zip(xlist, zlist):
- zcount[z] += 1
- if x in wordcount[z]:
- wordcount[z][x] += 1
- else:
- wordcount[z][x] = 1
- phi = self.worddist()
- for k in range(self.K):
- print "\n----------Topic#:%d-----------------" % k
- # print ("\n-- topic: %d (%d words)" % (k, zcount[k]))
- for w in np.argsort(-phi[k])[:num_words]:
- print ("%s" % (self.voca[w]))
- # print ("%s: %f (%d)" % (self.voca[w], phi[k, w], wordcount[k].get(w, 0)))
- def learning(self):
- for epoch in range(self.n_iter):
- st = time()
- self.inference()
- print "epoch:%d time:%s" % (epoch, str(time() - st))
- # np.savetxt("doc_topic.csv", self.m_z, delimiter=",")
- # np.savetxt("topic_words.csv", self.z_t, delimiter=",")
- def inference_parallel(tup):
- lda = tup[0]
- start = tup[1]
- end = tup[2]
- lda.start = start
- lda.end = end
- # if hasattr(os, 'getppid'): # only available on Unix
- # print 'parent process:', os.getppid()
- # print 'process id:', os.getpid()
- for doc_id in range(start, end):
- # for doc in enumerate(lda.docs[start:end]):
- m = doc_id
- doc = lda.docs[m]
- curr_m_z = lda.m_z[m]
- curr_m_t = lda.m_t[m]
- for n, t in enumerate(doc):
- z = curr_m_t[n]
- curr_m_z[z] -= 1
- lda.n_z[z] -= 1
- lda.z_t[z, t] -= 1
- # p(topic/doc)*p(word/topic)
- p_z = (lda.z_t[:, t] + lda.beta) * (curr_m_z + lda.alpha) / (lda.n_z + lda.V * lda.beta)
- new_z = np.random.multinomial(1, p_z / p_z.sum()).argmax()
- curr_m_t[n] = new_z
- curr_m_z[new_z] += 1
- lda.n_z[new_z] += 1
- lda.z_t[new_z, t] += 1
- return lda
- class Corpus:
- def __init__(self):
- self.word2id = {}
- self.id2word = {}
- def build_vocab(self, docs):
- count = 0
- for doc in docs:
- for t in doc:
- if not self.word2id.has_key(t):
- self.word2id[t] = count
- count += 1
- for key, val in self.word2id.iteritems():
- self.id2word[val] = key
- def get_vocab(self):
- return self.id2word
- def doc2bow(self, doc):
- res = []
- for term in doc:
- res.append(self.word2id[term])
- return res
- if __name__ == '__main__':
- docs = []
- # distribured
- no_procs = 8
- st = time()
- print "Starting parallel Runs"
- # gc.collect()
- pool = mp.Pool(processes=no_procs,initializer=pool_init)
- files = glob.glob("data/hsft_supercell/hsft_supercell_21_Mar_2017/data_en_preprocess_Jan_2017_supercell/*")
- doc_count = 0
- for file in files:
- with open(file) as fp:
- for line in fp:
- j_content = json.loads(line)
- issue = j_content.get("unigrams_cust")
- docs.append(issue)
- doc_count += 1
- # if doc_count >= 100000:
- # break
- print "Files#:%d Doc_Count:%d" % (len(files), doc_count)
- print "Data load time:%s" % str((time() - st))
- print "Data loaded...."
- MIN_FREQ = 2
- MIN_DF = 2
- CUT_OFF_TOP_PERCENTILE = 0.15
- CUT_OFF_BOTTOM_PERCENTILE = 0.15
- NUM_TOPICS = 10
- NUM_WORDS = 10
- # model parameters
- LDA_GIBBS_ITER = 1
- LDA_ALPHA = 5
- LDA_BETA = 0.1
- if os.path.isfile('lda_model.pickle'):
- print "Loading a pickle file"
- with open('lda_model.pickle', 'rb') as handle:
- lda_model = pickle.load(handle)
- else:
- st = time()
- corpus = CorpusCleaner(docs)
- cleaned_docs = corpus.clean(min_freq=MIN_FREQ, min_df=MIN_DF, cut_off_top_percentile=CUT_OFF_TOP_PERCENTILE,
- cut_off_bottom_percentile=CUT_OFF_BOTTOM_PERCENTILE)
- print "Data clean time:%s" % str((time() - st))
- print "Data cleaning done...."
- st = time()
- corpus = Corpus()
- corpus.build_vocab(cleaned_docs)
- dictionary = corpus.get_vocab()
- doc_term_matrix = [corpus.doc2bow(doc) for doc in cleaned_docs]
- print "preparation:%s" % str((time() - st))
- print "Docs#:%d Vocab#:%d" % (len(doc_term_matrix), len(dictionary))
- lda_model = LDACollapsedGibbs(doc_term_matrix, NUM_TOPICS, len(dictionary), dictionary, n_iter=LDA_GIBBS_ITER,
- alpha=LDA_ALPHA,
- beta=LDA_BETA)
- with open('lda_model.pickle', 'wb') as handle:
- pickle.dump(lda_model, handle, protocol=pickle.HIGHEST_PROTOCOL)
- print "Done pickling"
- # print "Objects", str(len(gc.get_objects()))
- # print sys.getsizeof(lda_model)
- st = time()
- doc_length = 909400
- bins = range(0, doc_length, doc_length / no_procs)
- bins[-1] = doc_length
- for par_run in range(300):
- print "Parallel Run :", str(par_run),
- fun_args = []
- for i in range(len(bins)-1):
- fun_args.append((lda_model,bins[i],bins[i+1]))
- res = pool.map_async(inference_parallel, fun_args)
- res = res.get()
- lda_model.m_z = lda_model.m_z *(-2)
- lda_model.z_t = lda_model.z_t *(-2)
- lda_model.n_z = lda_model.n_z*(-2)
- # st = time()
- for i,lda in enumerate(res):
- lda_model.m_z =lda_model.m_z + lda.m_z
- lda_model.z_t = lda_model.z_t + lda.z_t
- lda_model.n_z = lda_model.n_z +lda.n_z
- # print lda.start,lda.end
- lda_model.m_t[bins[i]:bins[i+1]] = lda.m_t[bins[i]:bins[i+1]]
- if (lda_model.is_converged()):
- print "Model Converged in ", str(par_run)
- break
- print "Parallel Run :%s" % str((time() - st))
- pool.close()
- pool.terminate()
Add Comment
Please, Sign In to add comment