Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy
- import numpy as np
- #import matplotlib.pylab as plt
- #import random
- #from scipy.linalg import norm
- #import PIL.Image
- from sklearn.datasets import load_digits
- def sigmoid(x):
- return 1. / (1 + numpy.exp(-x))
- def dsigmoid(x):
- return x * (1. - x)
- class RBM_CD(object):
- def __init__(self, input=None, n_visible=2, n_hidden=3, \
- W=None, hbias=None, vbias=None, rng=None):
- self.n_visible = n_visible # num of units in visible (input) layer
- self.n_hidden = n_hidden # num of units in hidden layer
- if rng is None:
- rng = numpy.random.RandomState(1234)
- if W is None:
- a = 1. / n_visible
- initial_W = numpy.array(rng.uniform( # initialize W uniformly(随机生成实数在-a-a之间)
- low=-a,
- high=a,
- size=(n_visible, n_hidden)))
- W = initial_W
- if hbias is None:
- hbias = numpy.zeros(n_hidden) # initialize h bias 0
- if vbias is None:
- vbias = numpy.zeros(n_visible) # initialize v bias 0
- self.rng = rng
- self.input = input
- self.W = W
- self.hbias = hbias
- self.vbias = vbias
- def contrastive_divergence(self, lr=0.1, k=1, input=None):
- if input is not None:
- self.input = input
- ph_mean, ph_sample = self.sample_h_given_v(self.input)
- chain_start = ph_sample
- #实现一步吉布斯采样通过给隐层采样
- for step in range(k):
- if step == 0:
- nv_means, nv_samples,\
- nh_means, nh_samples = self.gibbs_hvh(chain_start)
- else:
- nv_means, nv_samples,\
- nh_means, nh_samples = self.gibbs_hvh(nh_samples)
- # chain_end = nv_samples
- num_examples = self.input.shape[0]
- self.W += lr * (numpy.dot(self.input.T, ph_mean)
- - numpy.dot(nv_samples.T, nh_means))/num_examples
- self.vbias += lr * numpy.mean(self.input - nv_samples, axis=0)/num_examples
- self.hbias += lr * numpy.mean(ph_mean - nh_means, axis=0)/num_examples
- # cost = self.get_reconstruction_cross_entropy()
- # return cost
- # 通过给出显层单元推断出隐层单元的
- #计算隐层单元的激活率通过给出显层,得到一个采样通过给他们的
- def sample_h_given_v(self, v0_sample):
- h1_mean = self.propup(v0_sample)
- h1_sample = self.rng.binomial(size=h1_mean.shape, # discrete: binomial
- n=1,
- p=h1_mean)
- return [h1_mean, h1_sample]
- #一一步吉布斯采样通过从隐层率开始
- def sample_v_given_h(self, h0_sample):
- v1_mean = self.propdown(h0_sample)
- v1_sample = self.rng.binomial(size=v1_mean.shape, # discrete: binomial
- n=1,
- p=v1_mean)
- return [v1_mean, v1_sample]
- def propup(self, v):
- pre_sigmoid_activation = numpy.dot(v, self.W) + self.hbias
- return sigmoid(pre_sigmoid_activation)
- def propdown(self, h):
- pre_sigmoid_activation = numpy.dot(h, self.W.T) + self.vbias
- return sigmoid(pre_sigmoid_activation)
- #转换函数主要功能是通过给定的隐层采样来执行cd更新
- def gibbs_hvh(self, h0_sample):
- v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
- h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
- return [v1_mean, v1_sample,h1_mean, h1_sample]
- #计算重构误差
- def reconstruct_error(self, v):
- h = sigmoid(numpy.dot(v, self.W) + self.hbias)
- reconstructed_v = sigmoid(numpy.dot(h, self.W.T) + self.vbias)
- error = numpy.mean(numpy.sum((1-v)*reconstructed_v+v*(1-reconstructed_v),axis=1))
- return error
- def reconstruct_error_sample(self, v):
- h = self.sample_h_given_v(v)[1]
- reconstructed_v = self.sample_v_given_h(h)[1]
- error = numpy.mean(numpy.sum((v-reconstructed_v)**2,axis=1))
- return error
- class RBM_gicc(object):
- def __init__(self, input=None, gamma=1/2, n_visible=2, n_hidden=3, \
- W=None, hbias=None, vbias=None, rng=None):
- self.n_visible = n_visible # num of units in visible (input) layer
- self.n_hidden = n_hidden # num of units in hidden layer
- self.gamma = gamma
- if rng is None:
- rng = numpy.random.RandomState(1234)
- if W is None:
- a = 1. / n_visible
- initial_W = numpy.array(rng.uniform( # initialize W uniformly(随机生成实数在-a-a之间)
- low=-a,
- high=a,
- size=(n_hidden, n_visible)))
- W = initial_W
- if hbias is None:
- hbias = numpy.zeros(n_hidden) # initialize h bias 0
- if vbias is None:
- vbias = numpy.zeros(n_visible) # initialize v bias 0
- self.rng = rng
- self.input = input
- self.W = W
- self.hbias = hbias
- self.vbias = vbias
- def gicc(self, lr=0.1, iteration=5, input=None):
- if input is not None:
- self.input = input
- h_sample = self.sample_h_given_v(self.input)[1]
- print(sum(h_sample))
- W_old = self.W
- n = self.input.shape[0]
- for j in range(len(self.vbias)):
- print('j',j,'\n')
- vj = self.input[:,j]
- print("vj:",sum(vj),"\n")
- p_old = sigmoid(np.dot(h_sample, self.W[:,j].reshape(len(self.W[:,j]),1))+self.vbias[j]).T[0]
- p_old[p_old>0.99] = 0.99
- p_old[p_old < 0.01] = 0.01
- coef = 1/(vj*p_old+(1-vj)*(1-p_old))**self.gamma
- print(p_old)
- print(vj)
- for step in range(iteration):
- # print('step',step,'\n')
- p = sigmoid(np.dot(h_sample, self.W[:,j].reshape(len(self.W[:,j]),1))+self.vbias[j]).T[0]
- p[p>0.99] = 0.99
- p[p<0.01] = 0.01
- # if step==3:
- # print(np.dot(h_sample, self.W[:,j].reshape(len(self.W[:,j]),1))+self.vbias[j])
- # print(self.W[:,j],'\n')
- # print(self.vbias[j],'\n')
- # print('p',p,'\n')
- # print(self.W[:,j],'\n')
- dw = 0
- ddw = 0
- dv = 0
- ddv = 0
- for item in range(n):
- if p[item]==0 or p[item]==1:
- print("AAAAAAAAA")
- break
- dw += coef[item]*self.dfun(h_sample[item,:],vj[item],p[item])/n
- # print('item',item,'\n')
- # print(coef)
- # print(coef[item],'\n')
- # print(h_sample[item,:],'\n')
- # print(vj[item],'\n')
- # print(p[item],'\n')
- ddw += coef[item]*self.ddfun(h_sample[item,:],vj[item],p[item])/n
- # print('delta_ddw',ddw,'\n')
- dv += coef[item]*self.dfun(np.array([1]),vj[item],p[item])[0][0]/n
- ddv += coef[item]*self.ddfun(np.array([1]),vj[item],p[item])[0][0]/n
- # print('dw:',dw,'\n')
- # print('ddw:',ddw,'\n')
- # print('dv:',dv,'\n')
- # print('ddv:',ddv,'\n')
- # print('dw:',dw,'\n')
- #print('ddw:',np.matrix(ddw),'\n')
- if not np.all(dw==0):
- if np.linalg.det(ddw) == 0:
- print(coef)
- print(p_old)
- self.W[:,j] -= np.ravel(np.dot(np.matrix(ddw).I,dw))
- if not np.all(dv==0):
- self.vbias[j] -= dv/ddv
- for i in range(len(self.hbias)):
- hi = h_sample[:,i]
- ph_old = sigmoid(np.dot(self.input,W_old[i,:].reshape(len(W_old[i,:]),1))+self.hbias[i]).T[0]
- ph_old[ph_old>0.99] = 0.99
- ph_old[ph_old <0.01] = 0.01
- coef = 1/(hi*ph_old+(1-hi)*(1-ph_old))**self.gamma
- for step in range(iteration):
- p = sigmoid(np.dot(self.input, self.W[i,:].reshape(len(self.W[i,:]),1))+self.hbias[i]).T[0]
- p[p>0.99] = 0.99
- p[p<0.01] = 0.01
- dh = 0
- ddh = 0
- for item in range(n):
- if p[item]==0 or p[item]==1: break
- dh += coef[item]*self.dfun(np.array([1]),hi[item],p[item])[0][0]/n
- ddh += coef[item]*self.ddfun(np.array([1]),hi[item],p[item])[0][0]/n
- if not np.all(dh==0):
- self.hbias[i] -= dh/ddh
- def fun(self, v, p):
- f = v*p**self.gamma + (1-v)*(1-p)**self.gamma
- return f
- def dfun(self, x, v, p):
- r = self.gamma
- df = x.reshape(len(x),1)*(v*r*(1-p)*p**r-(1-v)*r*p*(1-p)**r)
- return df
- def ddfun(self, x, v, p):
- r = self.gamma
- coef = r*p*(1-p)*(v*r*p**(r-1)-(r+1)*v*p**r-(1-v)*(1-p)**r+r*(1-v)*p*(1-p)**(r-1))
- x = x.reshape(len(x),1)
- ddf = np.dot(x,x.T)*coef
- return ddf
- def sample_h_given_v(self, v0_sample):
- h1_mean = self.propup(v0_sample)
- h1_sample = self.rng.binomial(size=h1_mean.shape, # discrete: binomial
- n=1,
- p=h1_mean)
- return [h1_mean, h1_sample]
- def sample_v_given_h(self, h0_sample):
- v1_mean = self.propdown(h0_sample)
- v1_sample = self.rng.binomial(size=v1_mean.shape, # discrete: binomial
- n=1,
- p=v1_mean)
- return [v1_mean, v1_sample]
- def propup(self, v):
- pre_sigmoid_activation = numpy.dot(v, self.W.T) + self.hbias
- return sigmoid(pre_sigmoid_activation)
- def propdown(self, h):
- pre_sigmoid_activation = numpy.dot(h, self.W) + self.vbias
- return sigmoid(pre_sigmoid_activation)
- #计算重构误差
- def reconstruct_error(self, v):
- h = sigmoid(numpy.dot(v, self.W.T) + self.hbias)
- reconstructed_v = sigmoid(numpy.dot(h, self.W) + self.vbias)
- error = numpy.mean(numpy.sum((1-v)*reconstructed_v+v*(1-reconstructed_v),axis=1))
- return error
- def reconstruct_error_sample(self, v):
- h = self.sample_h_given_v(v)[1]
- reconstructed_v = self.sample_v_given_h(h)[1]
- error = numpy.mean(numpy.sum((v-reconstructed_v)**2,axis=1))
- return error
- def test_rbm(learning_rate=0.1, training_epochs=50):
- ''' data = numpy.array([[1,1,1,0,0,0],
- [1,0,1,0,0,0],
- [1,1,1,0,0,0],
- [0,0,1,1,1,0],
- [0,0,1,1,0,0],
- [0,0,1,1,1,0]])
- '''
- #data = readData('data.txt')
- digits = load_digits()
- data = np.array(digits.data)
- data[data>0] = 1
- rng = numpy.random.RandomState(123)
- # data = numpy.array([[1,1,1,0,0,0],
- # [1,0,1,0,0,0],
- # [1,1,1,0,0,0],
- # [0,0,1,1,1,0],
- # [0,0,1,1,0,0],
- # [0,0,1,1,1,0]])
- # construct RBM
- # rbm_CD = RBM_CD(input=data, n_visible=6, n_hidden=2, rng=rng)
- # rbm_gicc = RBM_gicc(input=data, n_visible=6, n_hidden=2, rng=rng)
- rbm_CD = RBM_CD(input=data, n_visible=64, n_hidden=5, rng=rng)
- rbm_gicc = RBM_gicc(input=data, n_visible=64, n_hidden=5, rng=rng)
- # train
- for epoch in range(training_epochs):
- #rbm_CD.contrastive_divergence(lr=learning_rate, k=1)
- #cost_CD = rbm_CD.reconstruct_error_sample(data)
- rbm_gicc.gicc(lr=learning_rate,iteration=10)
- cost_gicc = rbm_gicc.reconstruct_error_sample(data)
- print('Training epoch %d, cost of gicc is %f.' % (epoch+1, cost_gicc))
- if __name__ == "__main__":
- test_rbm()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement