Guest User

Numpy LDA

a guest
Apr 1st, 2017
284
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.88 KB | None | 0 0
  1. import numpy as  np
  2. from time import time
  3. from time import sleep
  4. import multiprocessing as mp
  5. from corpus_cleaner import CorpusCleaner
  6. from time import time
  7. import os
  8. import json
  9. import glob
  10. import sys
  11. import gc
  12. import pickle
  13.  
  14. CONV_THR = 0.8
  15.  
  16. def pool_init():  
  17.     import gc
  18.     gc.collect()
  19.  
  20. class LDACollapsedGibbs:
  21.     def __init__(self, docs, K, V, voca, n_iter, alpha, beta):
  22.  
  23.         self.K = K
  24.         self.V = V
  25.         self.alpha = alpha
  26.         self.beta = beta
  27.         self.docs = docs
  28.         self.voca = voca
  29.         self.n_iter = n_iter
  30.         self.start = None
  31.         self.end = None
  32.  
  33.         self.m_z = np.zeros((len(self.docs), K))  # docs vs  topics
  34.         self.z_t = np.zeros((K, V)) + self.beta  # topics vs vocab
  35.         self.n_z = np.zeros(K)  # words count in topics
  36.         self.m_t = []  # docsv vs terms
  37.  
  38.         # self.m_z = sparse.csr_matrix(self.m_z)
  39.         # self.z_t = sparse.csr_matrix(self.z_t)
  40.         # self.n_z = sparse.csr_matrix(self.n_z)
  41.  
  42.  
  43.         theta = np.random.dirichlet([self.alpha] * self.K, 1)
  44.         theta = theta[0, :]
  45.         for m, doc in enumerate(docs):
  46.             pr = []
  47.             for t in doc:
  48.                 # p_z = (self.z_t[:,t]+self.beta)*(curr_m_z+self.alpha)/(self.n_z+ self.V*self.beta)
  49.                 # z = np.random.multinomial(1,p_z/p_z.sum()).argmax()
  50.                 z = np.random.multinomial(1, theta / theta.sum()).argmax()
  51.                 self.m_z[m, z] += 1
  52.                 self.z_t[z, t] += 1
  53.                 self.n_z[z] += 1
  54.                 pr.append(z)
  55.             self.m_t.append(np.array(pr))
  56.  
  57.         self.learning()
  58.  
  59.     def worddist(self):
  60.         """get topic-word distribution"""
  61.         return self.z_t / self.n_z[:, np.newaxis]
  62.  
  63.     def inference(self):
  64.         for m, doc in enumerate(self.docs):
  65.             curr_m_z = self.m_z[m]
  66.             curr_m_t = self.m_t[m]
  67.             for n, t in enumerate(doc):
  68.                 z = curr_m_t[n]
  69.                 curr_m_z[z] -= 1
  70.                 self.n_z[z] -= 1
  71.                 self.z_t[z, t] -= 1
  72.                 # # p(topic/doc)*p(word/topic)
  73.                 p_z = (self.z_t[:, t]) * (curr_m_z + self.alpha) / (self.n_z + self.V)
  74.                 #
  75.                 # # p_z = (self.z_t[:,t]+self.beta)*(curr_m_z+self.alpha)/(self.n_z+self.V*self.beta)
  76.                 new_z = np.random.multinomial(1, p_z / p_z.sum()).argmax()
  77.                 #
  78.                 new_z = 2
  79.                 curr_m_t[n] = new_z
  80.                 curr_m_z[new_z] += 1
  81.                 self.n_z[new_z] += 1
  82.                 self.z_t[new_z, t] += 1
  83.  
  84.     def merge_topic(self):
  85.         to_be_merged_topic = self.n_z.argmin()
  86.  
  87.     def is_converged(self):
  88.         row_sum = self.z_t.sum(axis=1)
  89.         norm_z_t = self.z_t / row_sum[:, np.newaxis]
  90.         corr_matrix = np.corrcoef(norm_z_t)
  91.         det = np.linalg.det(corr_matrix)
  92.         print "DET: ", str(det)
  93.         if det >= CONV_THR:
  94.             return True
  95.         return False
  96.  
  97.         # self.n_z[np.where((self.n_z-self.n_z.mean()) >self.n_z.std())]
  98.  
  99.     def get_word_topic_dist(self):
  100.         topics = []
  101.         zcount = np.zeros(self.K, dtype=int)
  102.         wordcount = [dict() for k in range(self.K)]
  103.         for xlist, zlist in zip(self.docs, self.m_t):
  104.             for x, z in zip(xlist, zlist):
  105.                 zcount[z] += 1
  106.                 if x in wordcount[z]:
  107.                     wordcount[z][x] += 1
  108.                 else:
  109.                     wordcount[z][x] = 1
  110.  
  111.         phi = self.worddist()
  112.         for k in range(self.K):
  113.             words = []
  114.             for w in np.argsort(-phi[k]):
  115.                 words.append(self.voca[w])
  116.             topics.append(words)
  117.         return topics
  118.  
  119.     def print_word_topic_dist(self, num_words=10):
  120.         zcount = np.zeros(self.K, dtype=int)
  121.         wordcount = [dict() for k in range(self.K)]
  122.         for xlist, zlist in zip(self.docs, self.m_t):
  123.             for x, z in zip(xlist, zlist):
  124.                 zcount[z] += 1
  125.                 if x in wordcount[z]:
  126.                     wordcount[z][x] += 1
  127.                 else:
  128.                     wordcount[z][x] = 1
  129.  
  130.         phi = self.worddist()
  131.         for k in range(self.K):
  132.             print "\n----------Topic#:%d-----------------" % k
  133.             # print ("\n-- topic: %d (%d words)" % (k, zcount[k]))
  134.             for w in np.argsort(-phi[k])[:num_words]:
  135.                 print ("%s" % (self.voca[w]))
  136.                 # print ("%s: %f (%d)" % (self.voca[w], phi[k, w], wordcount[k].get(w, 0)))
  137.  
  138.     def learning(self):
  139.         for epoch in range(self.n_iter):
  140.             st = time()
  141.             self.inference()
  142.             print "epoch:%d time:%s" % (epoch, str(time() - st))
  143.             # np.savetxt("doc_topic.csv", self.m_z, delimiter=",")
  144.             # np.savetxt("topic_words.csv", self.z_t, delimiter=",")
  145.  
  146.  
  147. def inference_parallel(tup):
  148.     lda = tup[0]
  149.     start = tup[1]
  150.     end = tup[2]
  151.     lda.start = start
  152.     lda.end = end
  153.  
  154.     # if hasattr(os, 'getppid'):  # only available on Unix
  155.     #     print 'parent process:', os.getppid()
  156.     # print 'process id:', os.getpid()
  157.  
  158.     for doc_id in range(start, end):
  159.         # for doc in enumerate(lda.docs[start:end]):
  160.         m = doc_id
  161.         doc = lda.docs[m]
  162.         curr_m_z = lda.m_z[m]
  163.         curr_m_t = lda.m_t[m]
  164.         for n, t in enumerate(doc):
  165.             z = curr_m_t[n]
  166.             curr_m_z[z] -= 1
  167.             lda.n_z[z] -= 1
  168.             lda.z_t[z, t] -= 1
  169.             # p(topic/doc)*p(word/topic)
  170.             p_z = (lda.z_t[:, t] + lda.beta) * (curr_m_z + lda.alpha) / (lda.n_z + lda.V * lda.beta)
  171.             new_z = np.random.multinomial(1, p_z / p_z.sum()).argmax()
  172.  
  173.             curr_m_t[n] = new_z
  174.             curr_m_z[new_z] += 1
  175.             lda.n_z[new_z] += 1
  176.             lda.z_t[new_z, t] += 1
  177.     return lda
  178.  
  179.  
  180. class Corpus:
  181.     def __init__(self):
  182.         self.word2id = {}
  183.         self.id2word = {}
  184.  
  185.     def build_vocab(self, docs):
  186.         count = 0
  187.         for doc in docs:
  188.             for t in doc:
  189.                 if not self.word2id.has_key(t):
  190.                     self.word2id[t] = count
  191.                     count += 1
  192.         for key, val in self.word2id.iteritems():
  193.             self.id2word[val] = key
  194.  
  195.     def get_vocab(self):
  196.         return self.id2word
  197.  
  198.     def doc2bow(self, doc):
  199.         res = []
  200.         for term in doc:
  201.             res.append(self.word2id[term])
  202.         return res
  203.  
  204.  
  205. if __name__ == '__main__':
  206.  
  207.     docs = []
  208.  
  209.     # distribured
  210.     no_procs = 8
  211.  
  212.     st = time()
  213.     print "Starting parallel Runs"
  214.     # gc.collect()
  215.     pool = mp.Pool(processes=no_procs,initializer=pool_init)
  216.  
  217.     files = glob.glob("data/hsft_supercell/hsft_supercell_21_Mar_2017/data_en_preprocess_Jan_2017_supercell/*")
  218.     doc_count = 0
  219.     for file in files:
  220.         with open(file) as  fp:
  221.             for line in fp:
  222.                 j_content = json.loads(line)
  223.                 issue = j_content.get("unigrams_cust")
  224.                 docs.append(issue)
  225.                 doc_count += 1
  226.         # if doc_count >= 100000:
  227.         #     break
  228.  
  229.     print "Files#:%d Doc_Count:%d" % (len(files), doc_count)
  230.     print "Data load time:%s" % str((time() - st))
  231.  
  232.     print "Data loaded...."
  233.  
  234.     MIN_FREQ = 2
  235.     MIN_DF = 2
  236.     CUT_OFF_TOP_PERCENTILE = 0.15
  237.     CUT_OFF_BOTTOM_PERCENTILE = 0.15
  238.     NUM_TOPICS = 10
  239.     NUM_WORDS = 10
  240.  
  241.     # model parameters
  242.     LDA_GIBBS_ITER = 1
  243.     LDA_ALPHA = 5
  244.     LDA_BETA = 0.1
  245.  
  246.     if os.path.isfile('lda_model.pickle'):
  247.         print "Loading a pickle file"
  248.         with open('lda_model.pickle', 'rb') as handle:
  249.             lda_model = pickle.load(handle)
  250.  
  251.     else:
  252.         st = time()
  253.         corpus = CorpusCleaner(docs)
  254.         cleaned_docs = corpus.clean(min_freq=MIN_FREQ, min_df=MIN_DF, cut_off_top_percentile=CUT_OFF_TOP_PERCENTILE,
  255.                                     cut_off_bottom_percentile=CUT_OFF_BOTTOM_PERCENTILE)
  256.         print "Data clean time:%s" % str((time() - st))
  257.  
  258.         print "Data cleaning done...."
  259.  
  260.         st = time()
  261.         corpus = Corpus()
  262.         corpus.build_vocab(cleaned_docs)
  263.         dictionary = corpus.get_vocab()
  264.         doc_term_matrix = [corpus.doc2bow(doc) for doc in cleaned_docs]
  265.         print "preparation:%s" % str((time() - st))
  266.  
  267.         print "Docs#:%d Vocab#:%d" % (len(doc_term_matrix), len(dictionary))
  268.  
  269.  
  270.  
  271.         lda_model = LDACollapsedGibbs(doc_term_matrix, NUM_TOPICS, len(dictionary), dictionary, n_iter=LDA_GIBBS_ITER,
  272.                                       alpha=LDA_ALPHA,
  273.                                       beta=LDA_BETA)
  274.  
  275.         with open('lda_model.pickle', 'wb') as handle:
  276.              pickle.dump(lda_model, handle, protocol=pickle.HIGHEST_PROTOCOL)
  277.         print "Done pickling"
  278.  
  279.     # print "Objects", str(len(gc.get_objects()))
  280.  
  281.     # print sys.getsizeof(lda_model)
  282.     st = time()
  283.     doc_length = 909400
  284.     bins = range(0, doc_length, doc_length / no_procs)
  285.     bins[-1] = doc_length
  286.  
  287.  
  288.     for par_run in range(300):
  289.         print "Parallel Run :", str(par_run),
  290.         fun_args = []
  291.  
  292.         for i in range(len(bins)-1):
  293.             fun_args.append((lda_model,bins[i],bins[i+1]))
  294.  
  295.         res = pool.map_async(inference_parallel, fun_args)
  296.         res = res.get()
  297.         lda_model.m_z = lda_model.m_z *(-2)
  298.         lda_model.z_t = lda_model.z_t *(-2)
  299.         lda_model.n_z = lda_model.n_z*(-2)
  300.  
  301.         # st = time()
  302.         for i,lda in enumerate(res):
  303.             lda_model.m_z  =lda_model.m_z + lda.m_z
  304.             lda_model.z_t = lda_model.z_t + lda.z_t
  305.             lda_model.n_z = lda_model.n_z +lda.n_z
  306.             # print lda.start,lda.end
  307.             lda_model.m_t[bins[i]:bins[i+1]] = lda.m_t[bins[i]:bins[i+1]]
  308.         if (lda_model.is_converged()):
  309.             print "Model Converged in ", str(par_run)
  310.             break
  311.     print "Parallel Run :%s" % str((time() - st))
  312.     pool.close()
  313.     pool.terminate()
Add Comment
Please, Sign In to add comment