Last active
October 27, 2022 17:57
-
-
Save cs224/9a19b4ba2c7511e317be90c32a4d40d7 to your computer and use it in GitHub Desktop.
Simple Bayesian Network via Monte Carlo Markov Chain in PyMC3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import pyjags | |
import numpy as np | |
import pandas as pd | |
np.random.seed(0) | |
np.set_printoptions(precision=3) | |
def pyjags_trace(): | |
code = ''' | |
model { | |
rain ~ dbern(0.2) | |
sprinkler_p <- ifelse(rain, 0.01, 0.40) | |
sprinkler ~ dbern(sprinkler_p) | |
grass_wet_p <- ifelse(rain, ifelse(sprinkler, 0.99, 0.80), ifelse(sprinkler, 0.90, 0.0)) | |
grass_wet ~ dbern(grass_wet_p) | |
} | |
''' | |
model = pyjags.Model(code, data=dict(), chains=4) | |
samples = model.sample(5000, vars=['rain', 'sprinkler_p', 'sprinkler', 'grass_wet_p', 'grass_wet']) | |
trace = pd.Panel({k: v.squeeze(0) for k, v in samples.items()}) | |
trace.axes[0].name = 'Variable' | |
trace.axes[1].name = 'Iteration' | |
trace.axes[2].name = 'Chain' | |
return trace | |
def pyjags_trace_with_evidence(): | |
code = ''' | |
model { | |
rain ~ dbern(0.2) | |
sprinkler_p <- ifelse(rain, 0.01, 0.40) | |
sprinkler ~ dbern(sprinkler_p) | |
grass_wet_p <- ifelse(rain, ifelse(sprinkler, 0.99, 0.80), ifelse(sprinkler, 0.90, 0.0)) | |
grass_wet ~ dbern(grass_wet_p) | |
} | |
''' | |
grass_wet = 1.0 | |
model = pyjags.Model(code, data=dict(grass_wet=grass_wet), init=dict(rain=1, sprinkler=1), chains=4) | |
samples = model.sample(5000, vars=['rain', 'sprinkler_p', 'sprinkler', 'grass_wet_p', 'grass_wet']) | |
trace = pd.Panel({k: v.squeeze(0) for k, v in samples.items()}) | |
trace.axes[0].name = 'Variable' | |
trace.axes[1].name = 'Iteration' | |
trace.axes[2].name = 'Chain' | |
return trace | |
trace1 = pyjags_trace() | |
trace2 = pyjags_trace_with_evidence() | |
def trace_values(ldf): | |
p_rain_wet = float(ldf[(ldf['rain'] == 1) & (ldf['grass_wet'] == 1)].shape[0]) / ldf[ldf['grass_wet'] == 1].shape[0] | |
p_sprinkler_wet = float(ldf[(ldf['sprinkler'] == 1) & (ldf['grass_wet'] == 1)].shape[0]) / ldf[(ldf['grass_wet'] == 1)].shape[0] | |
d = float(ldf[(ldf['sprinkler'] == 0) & (ldf['rain'] == 0)].shape[0]) | |
if math.isclose(d, 0.): | |
p_not_sprinkler_rain_wet = None | |
else: | |
p_not_sprinkler_rain_wet = float(ldf[(ldf['sprinkler'] == 0) & (ldf['rain'] == 0) & (ldf['grass_wet'] == 1)].shape[0]) / d | |
return (p_rain_wet, p_sprinkler_wet, p_not_sprinkler_rain_wet) | |
print(trace_values(trace1.to_frame())) | |
print(trace_values(trace2.to_frame())) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import pymc | |
import numpy as np | |
import pandas as pd | |
import lea | |
np.random.seed(124) | |
rain = lea.B(20,100) | |
sprinkler = lea.Lea.if_(rain, lea.B(1,100), lea.B(40,100)) | |
grassWet = lea.Lea.buildCPT(( ~sprinkler & ~rain, False), | |
( ~sprinkler & rain, lea.B(80,100)), | |
( sprinkler & ~rain, lea.B(90,100)), | |
( sprinkler & rain, lea.B(99,100))) | |
# lea.P(grassWet & sprinkler & rain) # 99/50000 | |
# lea.Pf(grassWet & sprinkler & rain) # 0.00198 | |
# lea.P(rain.given(grassWet)) # 891/2491 | |
# lea.Pf(rain.given(grassWet)) # 0.3576876756322762 | |
# lea.Pf(rain.given(~grassWet)) # 0.07182480693230847 | |
# lea.Pf(grassWet.given(sprinkler&~rain)) # 0.9 | |
# lea.Pf(grassWet.given(~sprinkler&~rain)) # 0.0 | |
niter = 10000 | |
def pymc_trace(): | |
# Initialization | |
observed_values = [1.] | |
rain = pymc.Bernoulli('rain', .2, value=np.ones(len(observed_values))) | |
sprinkler_p = pymc.Lambda('sprinkler_p', lambda rain=rain: np.where(rain, .01, .4)) | |
# "Real" sprinkler varible | |
sprinkler = pymc.Bernoulli('sprinkler', sprinkler_p, value=np.ones(len(observed_values))) | |
grass_wet_p = pymc.Lambda('grass_wet_p', lambda sprinkler=sprinkler, rain=rain: np.where(sprinkler, np.where(rain, .99, .9), np.where(rain, .8, 0.0))) | |
# grass_wet = pymc.Bernoulli('grass_wet', p_grass_wet, value=observed_values, observed=True) | |
grass_wet = pymc.Bernoulli('grass_wet', grass_wet_p, value=np.ones(len(observed_values))) # , value=np.ones(len(observed_values)) | |
model = pymc.Model([grass_wet, grass_wet_p, sprinkler, sprinkler_p, rain]) | |
mcmc = pymc.MCMC(model) | |
mcmc.sample(niter, 2000) | |
# pymc.Matplot.plot(mcmc) | |
# | |
# geweke = pymc.geweke(mcmc) | |
# pymc.Matplot.geweke_plot(geweke) | |
dictionary = { | |
'Rain': [1 if ii[0] else 0 for ii in mcmc.trace('rain')[:].tolist() ], | |
'Sprinkler': [1 if ii[0] else 0 for ii in mcmc.trace('sprinkler')[:].tolist() ], | |
'Grass Wet': [1 if ii[0] else 0 for ii in mcmc.trace('grass_wet')[:].tolist() ], | |
'Sprinkler Probability': [ii[0] for ii in mcmc.trace('sprinkler_p')[:].tolist()], | |
'Grass Wet Probability': [ii[0] for ii in mcmc.trace('grass_wet_p')[:].tolist()], | |
} | |
ldf = pd.DataFrame(dictionary) | |
return ldf | |
df1 = pymc_trace() | |
print(df1.head()) | |
dfX = pymc_trace() # necessary to get "good" values for pymc_trace_with_evidence() | |
def pymc_trace_with_evidence(): | |
# Initialization | |
observed_values = [1.] | |
rain = pymc.Bernoulli('rain', .2, value=np.ones(len(observed_values))) # , value=np.ones(len(observed_values)) | |
sprinkler_p = pymc.Lambda('sprinkler_p', lambda rain=rain: np.where(rain, .01, .4)) | |
# "Real" sprinkler varible | |
sprinkler = pymc.Bernoulli('sprinkler', sprinkler_p, value=np.ones(len(observed_values))) # , value=np.ones(len(observed_values)) | |
grass_wet_p = pymc.Lambda('grass_wet_p', lambda sprinkler=sprinkler, rain=rain: np.where(sprinkler, np.where(rain, .99, .9), np.where(rain, .8, 0.0))) | |
# grass_wet = pymc.Bernoulli('grass_wet', p_grass_wet, value=observed_values, observed=True) | |
grass_wet = pymc.Bernoulli('grass_wet', grass_wet_p, value = observed_values, observed = True) # , value=np.ones(len(observed_values)) | |
model = pymc.Model([grass_wet, grass_wet_p, sprinkler, sprinkler_p, rain]) | |
mcmc = pymc.MCMC(model) | |
mcmc.sample(niter, 2000) | |
# pymc.Matplot.plot(mcmc) | |
# | |
# geweke = pymc.geweke(mcmc) | |
# pymc.Matplot.geweke_plot(geweke) | |
dictionary = { | |
'Rain': [1 if ii[0] else 0 for ii in mcmc.trace('rain')[:].tolist() ], | |
'Sprinkler': [1 if ii[0] else 0 for ii in mcmc.trace('sprinkler')[:].tolist() ], | |
'Grass Wet': [1 for ii in mcmc.trace('grass_wet_p')[:].tolist() ], # always true | |
'Sprinkler Probability': [ii[0] for ii in mcmc.trace('sprinkler_p')[:].tolist()], | |
'Grass Wet Probability': [ii[0] for ii in mcmc.trace('grass_wet_p')[:].tolist()], | |
} | |
ldf = pd.DataFrame(dictionary) | |
return ldf | |
df2 = pymc_trace_with_evidence() | |
print(df2.head()) | |
def pymc_trace_values(ldf): | |
# p_rain = ldf[(ldf['Rain'] == 1)].shape[0] / ldf.shape[0] | |
# print(p_rain) | |
# p_grass_wet = ldf[(ldf['Grass Wet'] == 1)].shape[0] / ldf.shape[0] | |
# print(p_grass_wet) | |
p_rain_wet = float(ldf[(ldf['Rain'] == 1) & (ldf['Grass Wet'] == 1)].shape[0]) / ldf[ldf['Grass Wet'] == 1].shape[0] | |
p_sprinkler_wet = float(ldf[(ldf['Sprinkler'] == 1) & (ldf['Grass Wet'] == 1)].shape[0]) / ldf[(ldf['Grass Wet'] == 1)].shape[0] | |
d = float(ldf[(ldf['Sprinkler'] == 0) & (ldf['Rain'] == 0)].shape[0]) | |
if math.isclose(d, 0.): | |
p_not_sprinkler_rain_wet = None | |
else: | |
p_not_sprinkler_rain_wet = float(ldf[(ldf['Sprinkler'] == 0) & (ldf['Rain'] == 0) & (ldf['Grass Wet'] > 0.5)].shape[0]) / d | |
return (p_rain_wet, p_sprinkler_wet, p_not_sprinkler_rain_wet) | |
# Given grass is wet, what is the probability that it was rained? | |
a = lea.Pf(rain.given(grassWet)) | |
# Given grass is wet, what is the probability that sprinkler was opened? | |
b = lea.Pf(sprinkler.given(grassWet)) | |
# Given that it did not rain and that the sprinkler is off what is the probability that the grass is wet? | |
c = lea.Pf(grassWet.given(~rain, ~sprinkler)) | |
print((a,b,c)) | |
print(pymc_trace_values(df1)) | |
print(pymc_trace_values(df2)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Original example in PyMC2: | |
# https://bugra.github.io/work/notes/2014-05-23/simple-bayesian-network-via-monte-carlo-markov-chain-mcmc-pymc/ | |
import pandas as pd | |
import pymc3 as pm | |
model = pm.Model() | |
with model: | |
rain = pm.Bernoulli('rain', 0.2) | |
sprinkler_p = pm.Deterministic('sprinkler_p', pm.math.switch(rain, 0.01, 0.40)) | |
sprinkler = pm.Bernoulli('sprinkler', sprinkler_p) | |
grass_wet_p = pm.Deterministic('grass_wet_p', pm.math.switch(rain, pm.math.switch(sprinkler, 0.99, 0.80), pm.math.switch(sprinkler, 0.90, 0.0))) | |
grass_wet = pm.Bernoulli('grass_wet', grass_wet_p) | |
# start = pm.find_MAP() # Use MAP estimate (optimization) as the initial state for MCMC | |
step = pm.Metropolis() | |
trace = pm.sample(100000, step=step, tune=5000, random_seed=123, progressbar=True) # init=start, | |
pm.traceplot(trace) | |
dictionary = { | |
'Rain': [1 if ii else 0 for ii in trace['rain'].tolist() ], | |
'Sprinkler': [1 if ii else 0 for ii in trace['sprinkler'].tolist() ], | |
'Sprinkler Probability': [ii for ii in trace['sprinkler_p'].tolist()], | |
'Grass Wet Probability': [ii for ii in trace['grass_wet_p'].tolist()], | |
} | |
df = pd.DataFrame(dictionary) | |
df.head() | |
# Given grass is wet, what is the probability that it was rained? | |
p_rain_wet = float(df[(df['Rain'] == 1) & (df['Grass Wet Probability'] > 0.5)].shape[0]) / df[df['Grass Wet Probability'] > 0.5].shape[0] | |
print(p_rain_wet) | |
# Given grass is wet, what is the probability that sprinkler was opened? | |
p_sprinkler_wet = float(df[(df['Sprinkler'] == 1) & (df['Grass Wet Probability'] > 0.5)].shape[0]) / df[df['Grass Wet Probability'] > 0.5].shape[0] | |
print(p_sprinkler_wet) | |
# Given sprinkler is off and it does not rain, what is the probability that grass is wet? | |
p_not_sprinkler_rain_wet = float(df[(df['Sprinkler'] == 0) & (df['Rain'] == 0) & (df['Grass Wet Probability'] > 0.5)].shape[0]) / df[df['Grass Wet Probability'] > 0.5].shape[0] | |
print(p_not_sprinkler_rain_wet) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import pymc3 as pm | |
niter = 10000 # 10000 | |
tune = 5000 # 5000 | |
model = pm.Model() | |
with model: | |
tv = [1] | |
rain = pm.Bernoulli('rain', 0.2, shape=1, testval=tv) | |
sprinkler_p = pm.Deterministic('sprinkler_p', pm.math.switch(rain, 0.01, 0.40)) | |
sprinkler = pm.Bernoulli('sprinkler', sprinkler_p, shape=1, testval=tv) | |
grass_wet_p = pm.Deterministic('grass_wet_p', pm.math.switch(rain, pm.math.switch(sprinkler, 0.99, 0.80), pm.math.switch(sprinkler, 0.90, 0.0))) | |
grass_wet = pm.Bernoulli('grass_wet', grass_wet_p, observed=np.array([1]), shape=1) | |
trace = pm.sample(20000, step=[pm.BinaryGibbsMetropolis([rain, sprinkler])], tune=tune, random_seed=124) | |
# pm.traceplot(trace) | |
dictionary = { | |
'Rain': [1 if ii[0] else 0 for ii in trace['rain'].tolist() ], | |
'Sprinkler': [1 if ii[0] else 0 for ii in trace['sprinkler'].tolist() ], | |
'Sprinkler Probability': [ii[0] for ii in trace['sprinkler_p'].tolist()], | |
'Grass Wet Probability': [ii[0] for ii in trace['grass_wet_p'].tolist()], | |
} | |
df = pd.DataFrame(dictionary) | |
p_rain = df[(df['Rain'] == 1)].shape[0] / df.shape[0] | |
print(p_rain) | |
p_sprinkler = df[(df['Sprinkler'] == 1)].shape[0] / df.shape[0] | |
print(p_sprinkler) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment