Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- r"""
- Markov switching vector autoregressive model
- Parameters
- ----------
- endo: pandas DataFrame
- The endogenous variables.
- order: integer
- The order of MSVAR. Default to auto-selection according to information criteria.
- regime: integer
- The number of regimes. Default is 2.
- switch_omega: boolean
- Whether or not there is regime-specific heteroskedasticity, i.e.
- whether or not the error term has a switching variance. Default is
- False.
- """
- def __init__(self, endo, regime=2, order=None, switch_omega=False):
- self.endo = endo; self.regime = regime; self.order = order
- self.vars = len(self.endo.T)
- self._base_model = VAR(self.endo)
- lags_to_select = np.array(list(self._base_model.select_order().selected_orders.values()))
- mod = mode(lags_to_select); min = lags_to_select.min()
- if self.order is None:
- self.order = mod[0][0] if mod[0][0] < 12 * (len(self.endo)/100)**0.25 and mod[1][0] > 1 else min
- self.switch_omega = switch_omega
- # Sanity checks
- if not isinstance(self.endo, pd.core.frame.DataFrame):
- raise TypeError('Use pandas DataFrame with rows as observations and columns as variables.')
- if self.regime < 2:
- raise ValueError('Must have at least 2 regimes.')
- if len(self.endo) <= self.order:
- raise ValueError('Must have more observations than the order of the MSVAR.')
- if not isinstance(self.switch_omega, bool):
- raise ValueError('Switching options must be boolean.')
- def _initial_values(self):
- # Initial regime transition probability
- P = np.random.rand(self.regime, self.regime)
- P_cols = np.array([sum(P[:, j]) for j in range(len(P))])
- for j, k in zip(range(len(P)), P_cols):
- for i in range(len(P)):
- P[i, j] = P[i, j] / k
- P[-1] = 1 - sum(P[:-1])
- '''you could check the initP for if it's good or bad, maybe could find something.'''
- self.initP = P
- # Initial unconditional regime probability
- rho = np.ones(self.regime) / self.regime
- rho[-1] = 1 - sum(rho[:-1])
- # Initial parameter values
- omega = np.tile(self.endo.cov().values.flatten(), self.regime).reshape(self.regime, self.vars, self.vars)
- res = self._base_model.fit(self.order)
- params = np.tile(res.params.T.values.flatten(), self.regime)
- .reshape(self.regime, self.vars, self.order * self.vars + 1)
- self.__P = P; self.__omega = omega; self.__params = params; self.__rho = rho
- return self.__P, self.__omega, self.__params, self.__rho
- def _filtered_prob(self):
- filtered = []; filtered1 = []; likelihood = []
- self.__boundary = (1e-06, 1-1e-06)
- lagged = lagmat(self.endo, self.order, use_pandas=True)
- lagged.insert(0, 'intercept', 1)
- self.__lagged = lagged
- for i in range(len(self.endo)):
- if i < self.order:
- # TODO use unconditional
- kstt = self.__rho; kst1t = self.__rho
- filtered.append(kstt); filtered1.append(kst1t); # likelihood.append(sum(dens))
- else:
- dens = np.array([density.pdf(self.endo.iloc[i, :], self.__params[j].dot
- (np.array(lagged.iloc[i, :].T)), self.__omega[j]) for j in range(self.regime)])
- kstt = (kst1t * dens) / sum(kst1t * dens)
- for i in range(len(kstt)):
- if kstt[i] < min(self.__boundary):
- kstt[i] = min(self.__boundary)
- elif kstt[i] > max(self.__boundary):
- kstt[i] = max(self.__boundary)
- kst1t = self.__P.dot(kstt)
- for i in range(len(kst1t)):
- if kst1t[i] < min(self.__boundary):
- kst1t[i] = min(self.__boundary)
- elif kst1t[i] > max(self.__boundary):
- kst1t[i] = max(self.__boundary)
- filtered.append(kstt); filtered1.append(kst1t); likelihood.append(np.sum(kst1t * dens));
- self.__filtered = np.array(filtered); self.__filtered1 = np.array(filtered1)
- self.__log_likelihood = np.sum(np.array([np.log(i) for i in likelihood]))
- print('The log-likelihood is {}.'.format(self.__log_likelihood))
- assert len(self.__filtered[self.__filtered >= 1]) is 0
- assert len(self.__filtered[self.__filtered <= 0]) is 0
- assert len(self.__filtered1[self.__filtered1 >= 1]) is 0
- assert len(self.__filtered1[self.__filtered1 <= 0]) is 0
- return self.__filtered, self.__filtered1, self.__log_likelihood, self.__lagged
- def _smoothed_prob(self):
- smoothed = []
- smoothed.append(self.__filtered[::-1][0])
- for i, j in zip(self.__filtered[::-1][1:], self.__filtered1[::-1][1:]):
- kstT = i * (self.__P.T.dot(smoothed[-1] / j))
- for i in range(len(kstT)):
- if kstT[i] < min(self.__boundary):
- kstT[i] = min(self.__boundary)
- elif kstT[i] > max(self.__boundary):
- kstT[i] = max(self.__boundary)
- smoothed.append(kstT)
- self.__smoothed = np.array(smoothed[::-1])
- assert len(self.__smoothed[self.__smoothed >= 1]) is 0
- assert len(self.__smoothed[self.__smoothed <= 0]) is 0
- return self.__smoothed
- def _em(self):
- # Estimate regime transition probability
- new_P = self.__P.copy()
- for i in range(self.regime):
- for j in range(self.regime):
- new_P[i, j] = np.sum(self.__filtered[self.order:][:, i] * self.__P[i, j] *
- self.__smoothed[self.order:][:, j] / self.__filtered1[self.order:][:, j]) /
- np.sum(self.__smoothed[self.order:][:, i])
- if new_P[i, j] > max(self.__boundary):
- new_P[i, j] = max(self.__boundary)
- elif new_P[i, j] < min(self.__boundary):
- new_P[i, j] = min(self.__boundary)
- '''I think this is where the problem at. If I remove the add up to 1 constraint, it yields different results
- at least in the first several recursion. This implies somehow, the 1 - distorts the converging path.
- I don't know if this formula should be theoretically in (0, 1) which can be proven
- analytically with every element of it is in (0, 1). '''
- new_P[-1] = 1 - sum(new_P[:-1])
- print(new_P)
- # assert len(new_P[new_P >= 1]) is 0
- # assert len(new_P[new_P <= 0]) is 0
- # Estimate unconditional regime probability
- new_rho = self.__rho.copy()
- for i in range(self.regime):
- '''According to Hamilton, we should update it with the self.orderth smooth prob (which
- in python is self.order-1 cuz python counts from 0). If the plausible P and params I get is somehow resonable,
- then this one is completely rubbish with value like near 0 for one state and near 1 for another.'''
- new_rho[i] = self.__smoothed[0, i]
- assert len(new_rho[new_rho >= 1]) is 0
- assert len(new_rho[new_rho <= 0]) is 0
- for i in range(len(new_rho)):
- if new_rho[i] < min(self.__boundary):
- new_rho[i] = min(self.__boundary)
- elif new_rho[i] > max(self.__boundary):
- new_rho[i] = max(self.__boundary)
- print(new_rho)
- # Estimate parameters
- '''These are estimated using the weighted OLS which maximizes the prob w.r.t smoothed prob (the expectation of EM
- ). Hamilton gives a scalar valued example in his book. I just changed it into the vector version. His 1990 paper gives
- vector version examples but with no autoregressive dynamics. Anyway, it's just weighted OLS estimator.'''
- new_params = self.__params.copy()
- for i in range(self.regime):
- new_params[i] = np.array(
- sum([(np.mat(self.endo.iloc[j, :].values).T * self.__smoothed[j, i]**0.5) *
- (np.mat(self.__lagged.iloc[j, :].values) * self.__smoothed[j, i]**0.5)
- for j in range(len(self.endo))]) * np.linalg.inv(
- sum([(np.mat(self.__lagged.iloc[j, :].values).T * self.__smoothed[j, i]**0.5) *
- (np.mat(self.__lagged.iloc[j, :].values) * self.__smoothed[j, i]**0.5)
- for j in range(len(self.endo))]))
- )
- residuals = np.array([(self.endo.T.values - new_params[i].dot(self.__lagged.T.values)).T
- for i in range(self.regime)])
- new_omega = self.__omega.copy()
- if self.switch_omega:
- for i in range(self.regime):
- new_omega[i] = sum([np.outer(residuals[i][j], residuals[i][j]) * self.__smoothed[j, i]
- for j in range(len(self.endo))]) /
- sum(self.__smoothed[:, i])
- else:
- for i in range(self.regime):
- new_omega[i] = sum([np.outer(residuals[i][j], residuals[i][j]) for j in range(len(self.endo))]) /
- len(self.endo) - self.order
- self.__new_P = new_P; self.__new_rho = new_rho; self.__new_params = new_params; self.__new_omega = new_omega
- self.__residuals = residuals
- # assert sum(new_P[:, 0]) == 1
- # assert sum(new_P[:, 1]) == 1
- return self.__new_P, self.__new_rho, self.__new_params, self.__new_omega, self.__residuals
- def fit(self):
- self._initial_values()
- self._filtered_prob()
- self._smoothed_prob()
- self._em()
- convergence_criteria = 1e-06
- P_conditions = np.array([abs(self.__new_P - self.__P) > convergence_criteria]).flatten().all()
- rho_conditions = np.array([abs(self.__new_rho - self.__rho) > convergence_criteria]).flatten().all()
- params_conditions = np.array([abs(self.__new_params - self.__params) > convergence_criteria]).flatten().all()
- omega_conditions = np.array([abs(self.__new_omega - self.__omega) > convergence_criteria]).flatten().all()
- conditions = P_conditions or rho_conditions or params_conditions or omega_conditions
- while conditions:
- '''When the conditions are printed, it turns out the P never satisfies the criteria. This is to say even
- the local maximum is achieved, the P do not converge. I think that's something
- related to the 1- constraint. By the way, you could get the mle result if you set the convergence criteria to
- self.__log_likelihood < 'some specific value depends on your data', many of the times you could end the iteration.'''
- print(P_conditions, rho_conditions, params_conditions, omega_conditions)
- self.__P = self.__new_P; self.__rho = self.__new_rho
- self.__params = self.__new_params; self.__omega = self.__new_omega
- self._filtered_prob()
- self._smoothed_prob()
- self._em()
- else:
- self.P = self.__new_P; self.rho = self.__new_rho
- self.params = self.__new_params; self.omega = self.__new_omega; self.residuals = self.__residuals
- self.filtered, non1, self.log_likelihood, non2 = self._filtered_prob()
- self.smoothed = self._smoothed_prob()
- return MSVARResult
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement