Created
September 2, 2020 16:45
-
-
Save brandonwillard/89e61ee0ca68b979d8ed6fef5be18cbe to your computer and use it in GitHub Desktop.
Simple time-varying transition matrix example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pymc3 as pm | |
import numpy as np | |
import pandas as pd | |
import patsy | |
import matplotlib.pyplot as plt | |
import theano | |
import theano.tensor as tt | |
from theano.printing import debugprint as tt_dprint | |
from pymc3_hmm.utils import multilogit_inv | |
from pymc3_hmm.distributions import HMMStateSeq, SwitchingProcess | |
from pymc3_hmm.step_methods import FFBSStep | |
# theano.config.cxx = "" | |
theano.compute_test_value = "warn" | |
np.random.seed(2032) | |
start_date = pd.Timestamp('2019-12-29 00:00:00') | |
time_index = pd.date_range(start=start_date, | |
end=start_date + pd.Timedelta('100W'), | |
closed='left', | |
freq='1d') | |
X_ind_df = pd.DataFrame({ | |
'weekday': time_index.weekday, | |
# 'hour': time_index.hour | |
}, index=time_index) | |
# formula_str = "~ 1 + C(weekday) + C(hour)" | |
formula_str = "~ 1 + C(weekday)" | |
X_df = patsy.dmatrix(formula_str, X_ind_df, return_type="dataframe") | |
xi_0_np = pd.Series( | |
# The coefficients used to compute the state zero-to-zero transition probabilities | |
np.array([10.0, -20.0, -20.0, 0.0, 0.0, -20.0, -20.0]), | |
index=X_df.columns) | |
xi_1_np = pd.Series( | |
# The coefficients for the state one-to-zero transition probabilities | |
np.array([-10.0, 20.0, 20.0, 0.0, 0.0, 20.0, 20.0]), | |
index=X_df.columns) | |
xis_np = np.stack([xi_0_np, xi_1_np], axis=1)[..., None] | |
xis_np.squeeze() | |
z_np = np.tensordot(X_df.values, xis_np, axes=((1,), (0,))) | |
z_np.squeeze()[:10] | |
P_np = multilogit_inv(z_np) | |
# X_ind_df[:10] | |
P_np.round(2)[:10] | |
p_0 = np.r_[0.0, 1.0] | |
S_np = HMMStateSeq.dist(P_np, p_0, shape=P_np.shape[-3]).random() | |
y_np = SwitchingProcess.dist([pm.Constant.dist(0), pm.Constant.dist(1)], S_np).random() | |
y_np[:30] | |
with pm.Model() as test_model: | |
xis_rv = pm.Normal("xi", 0.0, 10.0, shape=(X_df.shape[-1], 2, 1)) | |
z_tt = tt.tensordot(X_df.values, | |
# A weird hack for a broadcast mismatch issue in PyMC3 | |
tt.unbroadcast(xis_rv, 2), | |
axes=((1,), (0,))) | |
z_tt.name = "z" | |
P_tt = multilogit_inv(z_tt) | |
P_tt.name = "P" | |
P_rv = pm.Deterministic('P_t', P_tt) | |
pi_0_tt = tt.as_tensor_variable(np.r_[0.0, 1.0]) # pm.Dirichlet("pi", np.ones(2)) | |
pi_0_tt.name = "pi_0" | |
S_rv = HMMStateSeq("S_t", P_rv, pi_0_tt, shape=y_np.shape[-1]) | |
Y_rv = SwitchingProcess("Y_t", [pm.Constant.dist(0), pm.Constant.dist(1)], S_rv, observed=y_np) | |
with test_model: | |
ffbs = FFBSStep([S_rv]) | |
trace = pm.sample(2000, | |
step=[ffbs], | |
cores=1, | |
chains=1, | |
tune=500, | |
n_init=500) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment