Skip to content

Instantly share code, notes, and snippets.

@jcorrius
Created September 24, 2018 06:29
Show Gist options
  • Save jcorrius/23ffbe611b734c7c81cc4d0e2fb4d50a to your computer and use it in GitHub Desktop.
Save jcorrius/23ffbe611b734c7c81cc4d0e2fb4d50a to your computer and use it in GitHub Desktop.
Calculate Johansen Test for the given time series
import numpy as np
from statsmodels.tsa.tsatools import lagmat
def johansen(ts, lags):
"""
Calculate the Johansen Test for the given time series
"""
# Make sure we are working with arrays, convert if necessary
ts = np.asarray(ts)
# nobs is the number of observations
# m is the number of series
nobs, m = ts.shape
# Obtain the cointegrating vectors and corresponding eigenvalues
eigenvectors, eigenvalues = maximum_likelihood_estimation(ts, lags)
# Build table of critical values for the hypothesis test
critical_values_string = """2.71 3.84 6.63
13.43 15.50 19.94
27.07 29.80 35.46
44.49 47.86 54.68
65.82 69.82 77.82
91.11 95.75 104.96
120.37 125.61 135.97
153.63 159.53 171.09
190.88 197.37 210.06
232.11 239.25 253.24
277.38 285.14 300.29
326.53 334.98 351.25"""
select_critical_values = np.array(
critical_values_string.split(),
float).reshape(-1, 3)
critical_values = select_critical_values[:, 1]
# Calculate numbers of cointegrating relations for which
# the null hypothesis is rejected
rejected_r_values = []
for r in range(m):
if h_test(eigenvalues, r, nobs, lags, critical_values):
rejected_r_values.append(r)
return eigenvectors, rejected_r_values
def h_test(eigenvalues, r, nobs, lags, critical_values):
"""
Helper to execute the hypothesis test
"""
# Calculate statistic
t = nobs - lags - 1
m = len(eigenvalues)
statistic = -t * np.sum(np.log(np.ones(m) - eigenvalues)[r:])
# Get the critical value
critical_value = critical_values[m - r - 1]
# Finally, check the hypothesis
if statistic > critical_value:
return True
else:
return False
def maximum_likelihood_estimation(ts, lags):
"""
Obtain the cointegrating vectors and corresponding eigenvalues
"""
# Make sure we are working with array, convert if necessary
ts = np.asarray(ts)
# Calculate the differences of ts
ts_diff = np.diff(ts, axis=0)
# Calculate lags of ts_diff.
ts_diff_lags = lagmat(ts_diff, lags, trim='both')
# First lag of ts
ts_lag = lagmat(ts, 1, trim='both')
# Trim ts_diff and ts_lag
ts_diff = ts_diff[lags:]
ts_lag = ts_lag[lags:]
# Include intercept in the regressions
ones = np.ones((ts_diff_lags.shape[0], 1))
ts_diff_lags = np.append(ts_diff_lags, ones, axis=1)
# Calculate the residuals of the regressions of diffs and lags
# into ts_diff_lags
inverse = np.linalg.pinv(ts_diff_lags)
u = ts_diff - np.dot(ts_diff_lags, np.dot(inverse, ts_diff))
v = ts_lag - np.dot(ts_diff_lags, np.dot(inverse, ts_lag))
# Covariance matrices of the calculated residuals
t = ts_diff_lags.shape[0]
Svv = np.dot(v.T, v) / t
Suu = np.dot(u.T, u) / t
Suv = np.dot(u.T, v) / t
Svu = Suv.T
# ToDo: check for singular matrices and exit
Svv_inv = np.linalg.inv(Svv)
Suu_inv = np.linalg.inv(Suu)
# Calculate eigenvalues and eigenvectors of the product of covariances
cov_prod = np.dot(Svv_inv, np.dot(Svu, np.dot(Suu_inv, Suv)))
eigenvalues, eigenvectors = np.linalg.eig(cov_prod)
# Use Cholesky decomposition on eigenvectors
evec_Svv_evec = np.dot(eigenvectors.T, np.dot(Svv, eigenvectors))
cholesky_factor = np.linalg.cholesky(evec_Svv_evec)
eigenvectors = np.dot(eigenvectors, np.linalg.inv(cholesky_factor.T))
# Order the eigenvalues and eigenvectors
indices_ordered = np.argsort(eigenvalues)
indices_ordered = np.flipud(indices_ordered)
# Return the calculated values
return eigenvalues[indices_ordered], eigenvectors[:, indices_ordered]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment