Advertisement
Guest User

Untitled

a guest
Dec 9th, 2016
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.03 KB | None | 0 0
  1. import random
  2. import numpy
  3. import matplotlib.pyplot as plt
  4.  
  5. ###########################
  6. ## generating the data
  7. # generate 2/3 n from hot, then 1/3 n from cold.
  8. # returns: observations, #hot, #cold
  9. def generate_observations(n):
  10.     # probabilities of ice cream amounts given hot / cold:
  11.     # shown here as amounts of 1's, 2's, 3's out of 10 days
  12.     hot = [ 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]
  13.     cold = [ 1, 1, 1, 1, 1, 2, 2, 2, 2, 3]
  14.  
  15.     observations  = [ ]
  16.     # choose 2n observations from "hot"
  17.     numhot = int(2.0/3 * n)
  18.     for i in range(numhot): observations.append(random.choice(hot))
  19.     # choose n observations from "cold"
  20.     numcold = n - numhot
  21.     for i in range(numcold): observations.append(random.choice(cold))
  22.  
  23.     return (observations, numhot, numcold)
  24.  
  25. ###########################
  26. # data structures:
  27. #
  28. # numstates: number of states. we omit start and end state here, and assume equal probability of starting in either state.
  29. #
  30. # emission probabilities: numpy matrix of shape (N, |V|) (where |V| is the size of the vocabulary)
  31. # where entry (j, o) has emis_j(o).
  32. #
  33. # transition probablities: numpy matrix of shape (N, N) where entry (i, j) has trans(i, j).
  34. #
  35. # forward and backward probabilities: numpy matrix of shape (N, T) where entry (s, t) has forward probability
  36. # for state s and observation t
  37.  
  38. ################
  39. # computing forward and backward probabilities
  40. # forward:
  41. # alpha_t(j) = P(o1, ..., ot, qt = j | HMM)
  42. # forward function: returns numpy matrix of size (N, T)
  43. def forwardprobs(observations, initialprob, trans, emis, numstates, observation_indices):
  44.     forwardmatrix = numpy.zeros((numstates, len(observations)))
  45.  
  46.     # initialization
  47.     obs_index = observation_indices[ observations[0]]
  48.     for s in range(numstates):
  49.         forwardmatrix[ s, 0 ] = initialprob[s] * emis[ s, obs_index]
  50.  
  51.     # recursion step
  52.     for t in range(1, len(observations)):
  53.         obs_index = observation_indices[ observations[t]]
  54.         for s in range(numstates):
  55.             forwardmatrix[s, t] = emis[s, obs_index] * sum([forwardmatrix[s2, t-1] * trans[s2, s] \
  56.                                        for s2 in range(numstates)])
  57.     return forwardmatrix
  58.  
  59. # beta_t(j) = P(o_{t+1}, ..., o_T | qt = j, HMM)
  60. # backward function: returns numpy matrix of size (N, T)
  61. def backwardprobs(observations, trans, emis, numstates, observation_indices):
  62.     backwardmatrix = numpy.zeros((numstates, len(observations)))
  63.  
  64.     # initialization
  65.     for s in range(numstates):
  66.         backwardmatrix[ s, len(observations) - 1 ] = 1.0
  67.  
  68.     # recursion
  69.     for t in range(len(observations) - 2, -1, -1):
  70.         obs_index = observation_indices[ observations[t+1]]
  71.         for s in range(numstates):
  72.             backwardmatrix[s, t] = sum([ trans[s, s2] * emis[s2, obs_index] * backwardmatrix[s2, t+1] \
  73.                                          for s2 in range(numstates) ])
  74.  
  75.     return backwardmatrix
  76.                                  
  77.  
  78. def test_alphabeta():
  79.     observations = [3,1,3]
  80.     trans = numpy.matrix("0.7 0.3; 0.4 0.6")
  81.     emis = numpy.matrix("0.2 0.4 0.4; 0.5 0.4 0.1")
  82.     initialprob = numpy.array([0.8, 0.2])
  83.     numstates = 2
  84.     obs_indices = { 1 : 0, 2 : 1, 3: 2}
  85.  
  86.     print("FORWARD")
  87.     print(forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices))
  88.     print("\n")
  89.    
  90.     print('BACKWARD')
  91.     print(backwardprobs(observations, trans, emis, numstates, obs_indices))
  92.     print("\n")
  93.  
  94.    
  95. ####
  96. # expectation step:
  97. # re-estimate xi_t(i, j) and gamma_t(j)
  98. # returns two things:
  99. # - gamma is a (N, T) numpy matrix
  100. # - xi is a list of T numpy matrices of size (N, N)
  101. def expectation(observations, trans, emis, numstates, observation_indices, forward, backward):
  102.     # denominator: P(O | HMM)
  103.     p_o_given_hmm = sum([forward[s_i, len(observations) -1] for s_i in range(numstates) ])
  104.    
  105.     # computing xi
  106.     xi = [ ]
  107.     for t in range(len(observations) - 1):
  108.         obs_index = observation_indices[observations[t+1]]
  109.        
  110.         xi_t = numpy.zeros((numstates, numstates))
  111.        
  112.         for s_i in range(numstates):
  113.             for s_j in range(numstates):
  114.                 xi_t[ s_i, s_j] = (forward[s_i, t] * trans[s_i, s_j] * emis[s_j, obs_index] * backward[s_j, t+1]) / p_o_given_hmm
  115.         xi.append(xi_t)
  116.  
  117.     # computing gamma
  118.     gamma = numpy.zeros((numstates + 2, len(observations)))
  119.     for t in range(len(observations) - 1):
  120.         for s_i in range(numstates):
  121.             gamma[s_i, t] = sum([ xi[t][s_i, s_j] for s_j in range(numstates) ])
  122.  
  123.     for s_j in range(numstates):
  124.         gamma[s_j, len(observations) - 1] = sum( [ xi[t][s_i, s_j] for s_i in range(numstates) ] )
  125.            
  126.     return (gamma, xi)
  127.  
  128. ###
  129. # maximization step:
  130. # re-estimate trans, emis based on gamma, xi
  131. # returns:
  132. # - initialprob
  133. # - trans
  134. # - emis
  135. def maximization(observations, gamma, xi, numstates, observation_indices, vocabsize):
  136.     # re-estimate initial probabilities
  137.     initialprob = numpy.array([gamma[s_i, 0] for s_i in range(numstates)])
  138.    
  139.     # re-estimate emission probabilities
  140.     emis = numpy.zeros((numstates, vocabsize))
  141.  
  142.     for s in range(numstates):
  143.         denominator = sum( [gamma[s, t] for t in range(len(observations))])
  144.         for vocab_item, obs_index in observation_indices.items():
  145.             emis[s, obs_index] = sum( [gamma[s, t] for t in range(len(observations)) if observations[t] == vocab_item] )/denominator
  146.  
  147.     # re-estimate transition probabilities
  148.     trans = numpy.zeros((numstates, numstates))
  149.  
  150.     for s_i in range(numstates):
  151.         denominator = sum( [gamma[s_i, t] for t in range(len(observations) - 1) ])
  152.        
  153.         for s_j in range(numstates):
  154.             trans[s_i, s_j] = sum( [ xi[t][s_i, s_j] for t in range(len(observations) - 1) ] )/denominator
  155.  
  156.  
  157.     return (initialprob, trans, emis)
  158.  
  159. ##########
  160. # testing forward/backward
  161. def test_forwardbackward(numobservations, numiter):
  162.     ########
  163.     # generate observation
  164.     observations, truenumhot, truenumcold = generate_observations(numobservations)
  165.     obs_indices = { 1 : 0, 2 : 1, 3: 2}
  166.     numstates = 2
  167.     vocabsize = 3
  168.  
  169.     #####
  170.     # HMM initialization
  171.    
  172.     # initialize initial probs
  173.     unnormalized = numpy.random.rand(numstates)
  174.     initialprob = unnormalized / sum(unnormalized)
  175.    
  176.     # initialize emission probs
  177.     emis = numpy.zeros((numstates, vocabsize))
  178.     for s in range(numstates):
  179.         unnormalized = numpy.random.rand(vocabsize)
  180.         emis[s] = unnormalized / sum(unnormalized)
  181.    
  182.     # initialize transition probs
  183.     trans = numpy.zeros((numstates, numstates))
  184.     for s in range(numstates):
  185.         unnormalized = numpy.random.rand(numstates)
  186.         trans[s] = unnormalized / sum(unnormalized)
  187.  
  188.     print("OBSERVATIONS:")
  189.     print(observations)
  190.     print("\n")
  191.    
  192.     print("Random initialization:")
  193.     print("INITIALPROB")
  194.     print(initialprob)
  195.     print("\n")
  196.  
  197.     print("EMIS")
  198.     print(emis)
  199.     print("\n")
  200.  
  201.     print("TRANS")
  202.     print(trans)
  203.     print("\n")
  204.  
  205.     input()
  206.    
  207.     for iteration in range(numiter):
  208.    
  209.         forward = forwardprobs(observations, initialprob, trans, emis, numstates, obs_indices)
  210.         backward = backwardprobs(observations, trans, emis, numstates, obs_indices)
  211.  
  212.         gamma, xi = expectation(observations, trans, emis, numstates, obs_indices, forward, backward)
  213.  
  214.         initialprob, trans, emis = maximization(observations, gamma, xi, numstates, obs_indices, vocabsize)
  215.  
  216.         print("Re-computed:")
  217.         print("INITIALPROB")
  218.         print(initialprob)
  219.         print("\n")
  220.  
  221.         print("EMIS")
  222.         print(emis)
  223.         print("\n")
  224.  
  225.         print("TRANS")
  226.         print(trans)
  227.         print("\n")
  228.    
  229.         print("GAMMA(1)")
  230.         print(gamma[0])
  231.         print("\n")
  232.        
  233.         print("GAMMA(2)")
  234.         print(gamma[1])
  235.         print("\n")
  236.        
  237.  
  238.         # the first truenumhot observations were generated from the "hot" state.
  239.         # what is the probability of being in state 1 for the first
  240.         # truenumhot observations as opposed to the rest
  241.         avgprob_state1_for_truehot = sum(gamma[0][:truenumhot]) / truenumhot
  242.         avgprob_state1_for_truecold = sum(gamma[0][truenumhot:]) / truenumcold
  243.         print("Average prob. of being in state 1 when true state was Hot:", avgprob_state1_for_truehot)
  244.         print("Average prob. of being in state 1 when true state was Cold:", avgprob_state1_for_truecold)
  245.  
  246.         # plot observations and probabilities of being in certain states
  247.         from matplotlib import interactive
  248.         interactive(True)
  249.         xpoints = numpy.arange(len(observations))
  250.         fig, ax1 = plt.subplots()
  251.         ax1.plot(xpoints, observations, "b-")
  252.         plt.ylim([0, 4])
  253.         ax1.set_xlabel("timepoints")
  254.         ax1.set_ylabel("observations", color = "b")
  255.    
  256.         ax2 = ax1.twinx()
  257.         ax2.plot(xpoints, gamma[0], "r-")
  258.         plt.ylim([0.0, 1.0])
  259.         ax2.set_ylabel("prob", color = "r")
  260.         plt.show()
  261.         input()
  262.         plt.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement