Last active
August 9, 2022 20:12
-
-
Save atksh/a7f8f72f6e87500f447d1c8d60e38e0e to your computer and use it in GitHub Desktop.
Estimating the mixed normal distribution in PyMC3. Model selection with WBIC using normal and mixed normal distributions.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import warnings | |
warnings.filterwarnings('ignore') | |
import pymc3 as pm | |
from pymc3.distributions.dist_math import bound | |
import theano.tensor as tt | |
import theano | |
import numpy as np | |
np.random.seed(seed=32) | |
## PARAMS | |
N = 100 | |
a_true = 0.6 | |
mean1 = 0 | |
mean2 = 3 | |
sd = 1 | |
## TOY DATA | |
Y = np.random.randn(N) | |
Y[:int(N*a_true)] = Y[:int(N*a_true)] * sd + mean1 | |
Y[int(N*a_true):] = Y[int(N*a_true):] * sd + mean2 | |
## CUSTOM DISTRIBUTIONS | |
def normal_logp(value, mu, sd): | |
tau = sd ** -2 | |
return (-tau * (value - mu)**2 + tt.log(tau / np.pi / 2.)) / 2 | |
class NormalWithInverseTemperature(pm.Continuous): | |
def __init__(self, beta=1, mu=0, sd=1, **kwargs): | |
self.beta = beta | |
self.sd = sd | |
self.tau = sd**-2 | |
self.mu = mu | |
super(NormalWithInverseTemperature, self).__init__(**kwargs) | |
def logp(self, value): | |
sd = self.sd | |
tau = self.tau | |
mu = self.mu | |
return bound(normal_logp(value, mu, sd), sd > 0) * self.beta | |
class MixtureNormalWithInverseTemperature(pm.Continuous): | |
def __init__(self, a, mu, sd, beta=1, **kwargs): | |
self.beta = beta | |
self.sd = sd | |
self.mu = mu | |
self.a = a | |
super(MixtureNormalWithInverseTemperature, self).__init__(**kwargs) | |
def logp(self, value): | |
sd = self.sd | |
mu = self.mu | |
a = self.a | |
return pm.math.logsumexp( | |
pm.math.stack([pm.math.log(1-a) + normal_logp(value, 0, sd), | |
pm.math.log(a) + normal_logp(value, mu, sd)])) * self.beta | |
## DEFINE MODEL | |
beta = 1/np.log(N) | |
### Normal Model | |
with pm.Model() as model1: | |
mu = pm.Normal("mu", mu=0, sd=1e4) | |
sd = pm.HalfStudentT("sd", nu=3, sd=1e2) | |
obs = dict() | |
for i in range(N): | |
obs[i] = NormalWithInverseTemperature(f"obs_{i}", mu=mu, sd=sd, observed=Y[i], beta=beta) | |
y_pred = NormalWithInverseTemperature("pred", mu=mu, sd=sd, beta=beta, testval=0) # ppc | |
log_likes = pm.Deterministic("log_likes", pm.math.stack([obs[i].logpt / beta for i in range(N)])) # to calc wbic | |
trace1 = pm.sample(2000, tune=500, chains=4) | |
### Mixture Normal Model, fixed sigma = 1 and one of mu = 0 | |
with pm.Model() as model2: | |
a = pm.Uniform('a', lower=0, upper=1) | |
mu = pm.Normal('mu', mu=0, sd=1e4) | |
obs = dict() | |
for i in range(N): | |
obs[i] = MixtureNormalWithInverseTemperature(f"obs_{i}", a=a, mu=mu, sd=1, observed=Y[i], beta=beta) | |
y_pred = MixtureNormalWithInverseTemperature("pred", a=a, mu=mu, sd=1, testval=0, beta=beta) | |
log_likes = pm.Deterministic("log_likes", pm.math.stack([obs[i].logpt / beta for i in range(N)])) | |
trace2 = pm.sample(2000, tune=500, chains=4) | |
## CALCULATE WBIC | |
def wbic(log_likelihood): | |
return - np.sum(log_likelihood, axis=1).mean() | |
print("model1|WBIC = {:.3f}".format(wbic(trace1["log_likes"]))) | |
print("model2|WBIC = {:.3f}".format(wbic(trace2["log_likes"]))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment