Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from statsmodels.tsa.tsatools import lagmat
- def johansen(ts, lags):
- """
- Calculate the Johansen Test for the given time series
- """
- # Make sure we are working with arrays, convert if necessary
- ts = np.asarray(ts)
- # nobs is the number of observations
- # m is the number of series
- nobs, m = ts.shape
- # Obtain the cointegrating vectors and corresponding eigenvalues
- eigenvectors, eigenvalues = maximum_likelihood_estimation(ts, lags)
- # Build table of critical values for the hypothesis test
- critical_values_string = """2.71 3.84 6.63
- 13.43 15.50 19.94
- 27.07 29.80 35.46
- 44.49 47.86 54.68
- 65.82 69.82 77.82
- 91.11 95.75 104.96
- 120.37 125.61 135.97
- 153.63 159.53 171.09
- 190.88 197.37 210.06
- 232.11 239.25 253.24
- 277.38 285.14 300.29
- 326.53 334.98 351.25"""
- select_critical_values = np.array(
- critical_values_string.split(),
- float).reshape(-1, 3)
- critical_values = select_critical_values[:, 1]
- # Calculate numbers of cointegrating relations for which
- # the null hypothesis is rejected
- rejected_r_values = []
- for r in range(m):
- if h_test(eigenvalues, r, nobs, lags, critical_values):
- rejected_r_values.append(r)
- return eigenvectors, rejected_r_values
- def h_test(eigenvalues, r, nobs, lags, critical_values):
- """
- Helper to execute the hypothesis test
- """
- # Calculate statistic
- t = nobs - lags - 1
- m = len(eigenvalues)
- statistic = -t * np.sum(np.log(np.ones(m) - eigenvalues)[r:])
- # Get the critical value
- critical_value = critical_values[m - r - 1]
- # Finally, check the hypothesis
- if statistic > critical_value:
- return True
- else:
- return False
- def maximum_likelihood_estimation(ts, lags):
- """
- Obtain the cointegrating vectors and corresponding eigenvalues
- """
- # Make sure we are working with array, convert if necessary
- ts = np.asarray(ts)
- # Calculate the differences of ts
- ts_diff = np.diff(ts, axis=0)
- # Calculate lags of ts_diff.
- ts_diff_lags = lagmat(ts_diff, lags, trim='both')
- # First lag of ts
- ts_lag = lagmat(ts, 1, trim='both')
- # Trim ts_diff and ts_lag
- ts_diff = ts_diff[lags:]
- ts_lag = ts_lag[lags:]
- # Include intercept in the regressions
- ones = np.ones((ts_diff_lags.shape[0], 1))
- ts_diff_lags = np.append(ts_diff_lags, ones, axis=1)
- # Calculate the residuals of the regressions of diffs and lags
- # into ts_diff_lags
- inverse = np.linalg.pinv(ts_diff_lags)
- u = ts_diff - np.dot(ts_diff_lags, np.dot(inverse, ts_diff))
- v = ts_lag - np.dot(ts_diff_lags, np.dot(inverse, ts_lag))
- # Covariance matrices of the calculated residuals
- t = ts_diff_lags.shape[0]
- Svv = np.dot(v.T, v) / t
- Suu = np.dot(u.T, u) / t
- Suv = np.dot(u.T, v) / t
- Svu = Suv.T
- # ToDo: check for singular matrices and exit
- Svv_inv = np.linalg.inv(Svv)
- Suu_inv = np.linalg.inv(Suu)
- # Calculate eigenvalues and eigenvectors of the product of covariances
- cov_prod = np.dot(Svv_inv, np.dot(Svu, np.dot(Suu_inv, Suv)))
- eigenvalues, eigenvectors = np.linalg.eig(cov_prod)
- # Use Cholesky decomposition on eigenvectors
- evec_Svv_evec = np.dot(eigenvectors.T, np.dot(Svv, eigenvectors))
- cholesky_factor = np.linalg.cholesky(evec_Svv_evec)
- eigenvectors = np.dot(eigenvectors, np.linalg.inv(cholesky_factor.T))
- # Order the eigenvalues and eigenvectors
- indices_ordered = np.argsort(eigenvalues)
- indices_ordered = np.flipud(indices_ordered)
- # Return the calculated values
- return eigenvalues[indices_ordered], eigenvectors[:, indices_ordered]
Add Comment
Please, Sign In to add comment