Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- import copy
- import numpy as np
- from scipy.sparse import lil_matrix
- class ATM:
- def __init__(self, K, alpha, beta, max_iter, verbose=0):
- self.K=K
- self.alpha = alpha
- self.beta = beta
- self.max_iter = max_iter
- self.verbose=verbose
- def fit(self,W,A,V,S):
- self._W = W
- self._A = A
- self._D = len(W) # number of documents
- self._V = V # number of vocabularies
- self._S = S # number of distinct authors
- self.Z = self._init_Z()
- self.Y = self._init_Y()
- self.nak = self._init_nak()
- self.nkv = self._init_nkv()
- nkv_sum = self.nkv.sum(axis=1)
- nak_sum = self.nak.sum(axis=1)
- self._max_score = -1
- self.max_Z = None
- self.max_Y = None
- remained_iter = self.max_iter
- while True:
- if self.verbose: print remained_iter
- for d in np.random.choice(self._D, self._D, replace=False):
- # Sample Z and Y
- for i in np.random.choice(len(self._W[d]), len(self._W[d]), replace=False):
- k = self.Z[d][i] # topic
- v = self._W[d][i] # word index
- j = self.Y[d][i] # author_index within document d
- a = self._A[d][j] # author
- self.nak[a][k] -= 1
- self.nkv[k][v] -= 1
- nkv_sum[k] -= 1
- nak_sum[a] -= 1
- self.Z[d][i], self.Y[d][i] = self._sample_z_and_y(d,v,nkv_sum,nak_sum)
- new_a = self._A[d][self.Y[d][i]]
- new_k = self.Z[d][i]
- self.nak[new_a][new_k] += 1
- self.nkv[new_k][v] += 1
- nkv_sum[new_k] += 1
- nak_sum[new_a] += 1
- s = self.score(nkv_sum,nak_sum)
- if s > self._max_score:
- self.max_score = s
- self.max_Z = copy.copy(self.Z)
- self.max_Y = copy.copy(self.Y)
- remained_iter -= 1
- if remained_iter <= 0: break
- return self
- def _init_Z(self):
- Z = []
- for d in range(len(self._W)):
- Z.append(np.random.randint(low=0, high=self.K, size=len(self._W[d])))
- return Z
- def _init_Y(self):
- Y = []
- for d in range(len(self._W)):
- Y.append(np.random.randint(low=0, high=len(self._A[d]), size=len(self._W[d])))
- return Y
- def _init_nak(self):
- nak = np.zeros((self._S,self.K))
- for d in range(self._D):
- for i in range(len(self._W[d])):
- k = self.Z[d][i]
- j = self.Y[d][i]
- a = self._A[d][j]
- nak[a,k]+=1
- return nak
- def _init_nkv(self):
- nkv = np.zeros((self.K,self._V))
- for d in range(self._D):
- for i in range(len(self._W[d])):
- k = self.Z[d][i]
- v = self._W[d][i]
- nkv[k,v]+=1
- return nkv
- def _sample_z_and_y(self,d,v,nkv_sum,nak_sum):
- nkv = self.nkv[:,v] # k-dimensional vector
- na = len(self._A[d]) # number of authors in document d
- prob = []
- p1 = ((nkv+self.beta) / (nkv_sum+self.beta*self._V))
- for j in range(na):
- a = self._A[d][j]
- pa = p1 * ((self.nak[a]+self.alpha) / (nak_sum[a]+self.alpha*self.K))
- prob.append(pa)
- prob = np.array(prob).flatten()
- prob = prob/prob.sum()
- zy = np.random.multinomial(n=1, pvals=prob).argmax()
- z = zy%self.K
- y = zy/self.K
- return z,y
- def score(self,nkv_sum,nak_sum):
- s = 0
- for d in range(self._D):
- for i in range(len(self._W[d])):
- v = self._W[d][i]
- k = self.Z[d][i]
- a = self._A[d][self.Y[d][i]]
- s += ((self.nkv[k,v]+self.beta) / (nkv_sum[k]+self.beta*self._V)) * ((self.nak[a,k]+self.alpha) / (nak_sum[a]+self.alpha*self.K))
- return s
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement