Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- import numpy
- import matplotlib.pyplot as plt
- ###########################
- ## generating the data
- # generate 2/3 n from hot, then 1/3 n from cold.
- # returns: observations, #hot, #cold
- def generate_observations(n):
- # probabilities of ice cream amounts given hot / cold:
- # shown here as amounts of 1's, 2's, 3's out of 10 days
- hot = [ 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]
- cold = [ 1, 1, 1, 1, 1, 2, 2, 2, 2, 3]
- observations = [ ]
- # choose 2n observations from "hot"
- numhot = int(2.0/3 * n)
- for i in range(numhot): observations.append(random.choice(hot))
- # choose n observations from "cold"
- numcold = n - numhot
- for i in range(numcold): observations.append(random.choice(cold))
- return (observations, numhot, numcold)
- ###########################
- # data structures:
- #
- # numstates: number of states. we omit start and end state here, and assume equal probability of starting in either state.
- #
- # emission probabilities: numpy matrix of shape (N, |V|) (where |V| is the size of the vocabulary)
- # where entry (j, o) has emis_j(o).
- #
- # transition probablities: numpy matrix of shape (N, N) where entry (i, j) has trans(i, j).
- #
- # forward and backward probabilities: numpy matrix of shape (N, T) where entry (s, t) has forward probability
- # for state s and observation t
- ################
- # computing forward and backward probabilities
- # forward:
- # alpha_t(j) = P(o1, ..., ot, qt = j | HMM)
- # forward function: returns numpy matrix of size (N, T)
- def forwardprobs(observations, initialprob, trans, emis, numstates, observation_indices):
- forwardmatrix = numpy.zeros((numstates, len(observations)))
- # initialization
- obs_index = observation_indices[ observations[0]]
- for s in range(numstates):
- forwardmatrix[ s, 0 ] = initialprob[s] * emis[ s, obs_index]
- # recursion step
- for t in range(1, len(observations)):
- obs_index = observation_indices[ observations[t]]
- for s in range(numstates):
- forwardmatrix[s, t] = emis[s, obs_index] * sum([forwardmatrix[s2, t-1] * trans[s2, s] \
- for s2 in range(numstates)])
- return forwardmatrix
- # beta_t(j) = P(o_{t+1}, ..., o_T | qt = j, HMM)
- # backward function: returns numpy matrix of size (N, T)
- def backwardprobs(observations, trans, emis, numstates, observation_indices):
- backwardmatrix = numpy.zeros((numstates, len(observations)))
- # initialization
- for s in range(numstates):
- backwardmatrix[ s, len(observations) - 1 ] = 1.0
- # recursion
- for t in range(len(observations) - 2, -1, -1):
- obs_index = observation_indices[ observations[t+1]]
- for s in range(numstates):
- backwardmatrix[s, t] = sum([ trans[s, s2] * emis[s2, obs_index] * backwardmatrix[s2, t+1] \
- for s2 in range(numstates) ])
- return backwardmatrix
- def test_alphabeta():
- observations = [3,1,3]
- trans = numpy.matrix("0.7 0.3; 0.4 0.6")
- emis = numpy.matrix("0.2 0.4 0.4; 0.5 0.4 0.1")
- initialprob = numpy.array([0.8, 0.2])
- numstates = 2
- obs_indices = { 1 : 0, 2 : 1, 3: 2}
- print("FORWARD")
- print(forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices))
- print("\n")
- print('BACKWARD')
- print(backwardprobs(observations, trans, emis, numstates, obs_indices))
- print("\n")
- ####
- # expectation step:
- # re-estimate xi_t(i, j) and gamma_t(j)
- # returns two things:
- # - gamma is a (N, T) numpy matrix
- # - xi is a list of T numpy matrices of size (N, N)
- def expectation(observations, trans, emis, numstates, observation_indices, forward, backward):
- # denominator: P(O | HMM)
- p_o_given_hmm = sum([forward[s_i, len(observations) -1] for s_i in range(numstates) ])
- # computing xi
- xi = [ ]
- for t in range(len(observations) - 1):
- obs_index = observation_indices[observations[t+1]]
- xi_t = numpy.zeros((numstates, numstates))
- for s_i in range(numstates):
- for s_j in range(numstates):
- xi_t[ s_i, s_j] = (forward[s_i, t] * trans[s_i, s_j] * emis[s_j, obs_index] * backward[s_j, t+1]) / p_o_given_hmm
- xi.append(xi_t)
- # computing gamma
- gamma = numpy.zeros((numstates + 2, len(observations)))
- for t in range(len(observations) - 1):
- for s_i in range(numstates):
- gamma[s_i, t] = sum([ xi[t][s_i, s_j] for s_j in range(numstates) ])
- for s_j in range(numstates):
- gamma[s_j, len(observations) - 1] = sum( [ xi[t][s_i, s_j] for s_i in range(numstates) ] )
- return (gamma, xi)
- ###
- # maximization step:
- # re-estimate trans, emis based on gamma, xi
- # returns:
- # - initialprob
- # - trans
- # - emis
- def maximization(observations, gamma, xi, numstates, observation_indices, vocabsize):
- # re-estimate initial probabilities
- initialprob = numpy.array([gamma[s_i, 0] for s_i in range(numstates)])
- # re-estimate emission probabilities
- emis = numpy.zeros((numstates, vocabsize))
- for s in range(numstates):
- denominator = sum( [gamma[s, t] for t in range(len(observations))])
- for vocab_item, obs_index in observation_indices.items():
- emis[s, obs_index] = sum( [gamma[s, t] for t in range(len(observations)) if observations[t] == vocab_item] )/denominator
- # re-estimate transition probabilities
- trans = numpy.zeros((numstates, numstates))
- for s_i in range(numstates):
- denominator = sum( [gamma[s_i, t] for t in range(len(observations) - 1) ])
- for s_j in range(numstates):
- trans[s_i, s_j] = sum( [ xi[t][s_i, s_j] for t in range(len(observations) - 1) ] )/denominator
- return (initialprob, trans, emis)
- ##########
- # testing forward/backward
- def test_forwardbackward(numobservations, numiter):
- ########
- # generate observation
- observations, truenumhot, truenumcold = generate_observations(numobservations)
- obs_indices = { 1 : 0, 2 : 1, 3: 2}
- numstates = 2
- vocabsize = 3
- #####
- # HMM initialization
- # initialize initial probs
- unnormalized = numpy.random.rand(numstates)
- initialprob = unnormalized / sum(unnormalized)
- # initialize emission probs
- emis = numpy.zeros((numstates, vocabsize))
- for s in range(numstates):
- unnormalized = numpy.random.rand(vocabsize)
- emis[s] = unnormalized / sum(unnormalized)
- # initialize transition probs
- trans = numpy.zeros((numstates, numstates))
- for s in range(numstates):
- unnormalized = numpy.random.rand(numstates)
- trans[s] = unnormalized / sum(unnormalized)
- print("OBSERVATIONS:")
- print(observations)
- print("\n")
- print("Random initialization:")
- print("INITIALPROB")
- print(initialprob)
- print("\n")
- print("EMIS")
- print(emis)
- print("\n")
- print("TRANS")
- print(trans)
- print("\n")
- input()
- for iteration in range(numiter):
- forward = forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices)
- backward = backwardprobs(observations, trans, emis, numstates, obs_indices)
- gamma, xi = expectation(observations, trans, emis, numstates, obs_indices, forward, backward)
- initialprob, trans, emis = maximization(observations, gamma, xi, numstates, obs_indices, vocabsize)
- print("Re-computed:")
- print("INITIALPROB")
- print(initialprob)
- print("\n")
- print("EMIS")
- print(emis)
- print("\n")
- print("TRANS")
- print(trans)
- print("\n")
- print("GAMMA(1)")
- print(gamma[0])
- print("\n")
- print("GAMMA(2)")
- print(gamma[1])
- print("\n")
- # the first truenumhot observations were generated from the "hot" state.
- # what is the probability of being in state 1 for the first
- # truenumhot observations as opposed to the rest
- avgprob_state1_for_truehot = sum(gamma[0][:truenumhot]) / truenumhot
- avgprob_state1_for_truecold = sum(gamma[0][truenumhot:]) / truenumcold
- print("Average prob. of being in state 1 when true state was Hot:", avgprob_state1_for_truehot)
- print("Average prob. of being in state 1 when true state was Cold:", avgprob_state1_for_truecold)
- # plot observations and probabilities of being in certain states
- from matplotlib import interactive
- interactive(True)
- xpoints = numpy.arange(len(observations))
- fig, ax1 = plt.subplots()
- ax1.plot(xpoints, observations, "b-")
- plt.ylim([0, 4])
- ax1.set_xlabel("timepoints")
- ax1.set_ylabel("observations", color = "b")
- ax2 = ax1.twinx()
- ax2.plot(xpoints, gamma[0], "r-")
- plt.ylim([0.0, 1.0])
- ax2.set_ylabel("prob", color = "r")
- plt.show()
- input()
- plt.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement