Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #! python
- import numpy as np
- import operator
- dynamic_model = np.array([
- [0.7, 0.3],
- [0.3, 0.7]
- ])
- observation_model = np.array([
- [0.9, 0.0],
- [0.0, 0.2]
- ])
- initial_rain_probabilities = np.array(
- [0.5, 0.5]
- )
- evidences = [True, True, False, True, True]
- def forward(state, evidence):
- obsmodel = observation_model
- if not evidence:
- obsmodel = np.eye(len(obsmodel)) - obsmodel
- result = np.dot(obsmodel, np.transpose(dynamic_model))
- result = np.dot(result, state)
- return result/result.sum()
- def backward(probability, evidence):
- obsmodel = observation_model
- if not evidence:
- obsmodel = np.eye(len(obsmodel)) - obsmodel
- result = np.dot(dynamic_model, obsmodel)
- result = np.dot(result, probability)
- return result
- def viterbi():
- """Return the best path, given an HMM model and a sequence of observations"""
- # A - initialise stuff
- nSamples = len(evidences)
- nStates = dynamic_model.shape[0] # number of states
- c = np.zeros(nSamples) #scale factors (necessary to prevent underflow)
- viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table
- psi = np.zeros((nStates,nSamples)) # initialise the best path table
- best_path = np.zeros(nSamples); # this will be your output
- # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b)
- viterbi[:,0] = initial_rain_probabilities.T * observation_model[:, evidences[0]]
- c[0] = 1.0/np.sum(viterbi[:,0])
- viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor
- psi[0] = 0;
- # C- Do the iterations for viterbi and psi for time>0 until T
- for t in range(1,nSamples): # loop through time
- for s in range (0,nStates): # loop through the states @(t-1)
- trans_p = viterbi[:,t-1] * dynamic_model[:,s]
- psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1))
- viterbi[s,t] = viterbi[s,t]*observation_model[s, evidences[t]]
- c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor
- viterbi[:,t] = c[t] * viterbi[:,t]
- # D - Back-tracking
- best_path[nSamples-1] = viterbi[:,nSamples-1].argmax() # last state
- for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step
- best_path[t-1] = psi[best_path[t],t]
- return best_path
- print "Forward phase"
- forward_states = np.array([None]*(len(evidences)+1))
- forward_states[0] = initial_rain_probabilities
- for evidence_no in range(0, len(evidences)):
- forward_states[evidence_no + 1] = forward(
- forward_states[evidence_no],
- evidences[evidence_no]
- )
- print evidence_no+1, forward_states[evidence_no + 1]
- print "\nBackward phase"
- backward_states = np.array([None]*(len(evidences)+1))
- backward_states[0] = initial_rain_probabilities
- backward_probability = np.array([1.0, 1.0])
- for evidence_no in range(len(evidences), -1, -1):
- new_probability = forward_states[evidence_no]*backward_probability
- backward_states[evidence_no] = new_probability/new_probability.sum()
- print evidence_no, backward_probability
- backward_probability = backward(backward_probability, evidences[evidence_no-1])
- print "\nResults with smoothing"
- for state_no in range(len(backward_states)):
- print state_no, backward_states[state_no]
- print "\nViterbi"
- print viterbi()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement