Last active
July 1, 2023 08:19
-
-
Save MNoorFawi/0109c0cc945be1fc88131e9e587f820b to your computer and use it in GitHub Desktop.
bayesian neural network using pymc3 library with residual block and dropout
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import numpy as np | |
import theano | |
import pymc3 as pm | |
def relu(x): | |
return pm.math.maximum(0, x) | |
def residual_block(X, input_dim, hidden_dim, | |
prior_mean, prior_sd, | |
init_val, i, dropout_prob): | |
# 1st layer | |
wts_in_1 = pm.Normal( | |
"wts_in_1"+i, | |
mu = prior_mean, sd = prior_sd, | |
shape = (input_dim, hidden_dim), | |
initval = init_val | |
) | |
dropout_wts_in_1 = pm.Deterministic("dropout_wts_in_1"+i, | |
wts_in_1 * pm.math.switch( | |
pm.math.floor( | |
pm.math.invlogit(dropout_prob)), 0, 1)) | |
act_1 = relu(pm.math.dot(X, dropout_wts_in_1)) | |
# 2nd layer | |
wts_1_2 = pm.Normal( | |
"wts_1_2" + i, | |
mu = 0., sd = 1., | |
shape = (hidden_dim, hidden_dim)) | |
dropout_wts_1_2 = pm.Deterministic("dropout_wts_1_2" + i, | |
wts_1_2 * pm.math.switch( | |
pm.math.floor( | |
pm.math.invlogit(dropout_prob)), 0, 1)) | |
act_2 = relu(pm.math.dot(act_1, dropout_wts_1_2)) | |
# 3rd layer | |
wts_2_3 = pm.Normal( | |
"wts_2_3" + i, | |
mu = 0., sd = 1., | |
shape = (hidden_dim, hidden_dim)) | |
dropout_wts_2_3 = pm.Deterministic("dropout_wts_2_3" + i, | |
wts_2_3 * pm.math.switch( | |
pm.math.floor( | |
pm.math.invlogit(dropout_prob)), 0, 1)) | |
act_3 = relu(pm.math.dot(act_2, dropout_wts_2_3)) | |
# output/final layer from residual | |
wts_3_out = pm.Normal("wts_3_out"+i, mu = 0., sd = 1., shape = (hidden_dim, input_dim)) | |
act_out = relu(pm.math.dot(act_3, wts_3_out)) | |
# add the residual connection | |
residual = X + act_out | |
return residual | |
def construct_bnn(X, y, n_hidden = 15, n_residuals = 3): | |
floatX = theano.config.floatX | |
dims = X.shape[1] | |
nrows = X.shape[0] | |
mean_X = np.mean(X, axis = 0).reshape(-1, 1) | |
sd_X = np.std(X, axis = 0).reshape(-1, 1) | |
# init value for weight_in_1 from priors | |
init_1 = mean_X + sd_X * np.random.randn(dims, n_hidden).astype(floatX) | |
# init values for weights of layers after residual | |
init_4 = np.random.randn(dims, n_hidden).astype(floatX) | |
init_out = np.random.randn(n_hidden).astype(floatX) | |
# initiate the pymc3 model | |
with pm.Model() as bayesian_nn: | |
X = pm.Data("X", X) | |
x_for_residual = X | |
y = pm.Data("y", y) | |
dropout_prob = pm.Uniform("dropout_prob", lower = 0., upper = 1.) | |
# residual block | |
for i in range(n_residuals): | |
x_for_residual = residual_block(x_for_residual, dims, n_hidden, mean_X, sd_X, init_1, str(i), dropout_prob) | |
# more layers after the residual | |
wts_3_4 = pm.Normal("wts_3_4", mu = 0., sd = 1., | |
# connected with output from residual | |
shape = (dims, n_hidden), | |
initval = init_4) | |
bias_3_4 = pm.Normal("bias_3_4", mu = 0., sd = 1., shape = (n_hidden, )) | |
dropout_wts_3_4 = pm.Deterministic( | |
"dropout_wts_3_4", | |
wts_3_4 * pm.math.switch(pm.math.floor(pm.math.invlogit(dropout_prob)), 0, 1) | |
) | |
act_4 = relu(pm.math.dot(x_for_residual, dropout_wts_3_4) + bias_3_4) | |
# last layer to output | |
wts_4_out = pm.Normal("wts_4_out", mu = 0., sd = 1., | |
shape = (n_hidden, ), | |
initval = init_out) | |
bias_4_out = pm.Normal("bias_4_out", mu = 0., sd = 1., shape = (1, )) | |
act_out = pm.Deterministic( | |
"probability", | |
pm.math.sigmoid(pm.math.dot(act_4, wts_4_out) + bias_4_out) | |
) | |
# binary classification (bernoulli likelihood) | |
output = pm.Bernoulli("output", act_out, observed = y, total_size = nrows) | |
return bayesian_nn | |
# dummy data | |
np.random.seed(13) | |
num_samples = 100 | |
X0 = np.random.randn(num_samples // 2, 2) + np.array([-2, -2]) | |
X1 = np.random.randn(num_samples // 2, 2) + np.array([2, 2]) | |
X = np.vstack((X0, X1)) | |
y = np.concatenate((np.zeros(num_samples // 2), np.ones(num_samples // 2))) | |
shuffle_indices = np.random.permutation(num_samples) | |
X = X[shuffle_indices] | |
y = y[shuffle_indices] | |
bayesian_nn = construct_bnn(X, y) | |
n = 15000 | |
# run | |
with bayesian_nn: | |
infer = pm.ADVI() | |
approx = pm.fit(method = infer, n = n, | |
obj_optimizer = pm.adam(learning_rate = 0.005), | |
test_optimizer = pm.adam(learning_rate = 0.0001), | |
random_seed = 1311) | |
# sample from posteriors | |
trace = approx.sample(draws = 200) | |
# predict using posterior predictive samples | |
with bayesian_nn: | |
# N.B. pymc3 uses shared variables to be aware of input and output | |
# so here it will sample from shared variable X which was used for training | |
# to use the network to predict for new data, we have to set the shared object X to new data | |
# by using: | |
# pm.set_data({"X": new_data_array}) | |
ppc = pm.sample_posterior_predictive(trace, progressbar = False, var_names = ["probability"]) | |
# then using mean or median we can get an estimate for the predicted probability | |
prediction = ppc["probability"].mean(axis = 0) | |
# to save the model | |
with open("bayesian_nn.pkl", "wb") as f: | |
pickle.dump({ | |
"model": bayesian_nn, "trace": trace | |
}, f) | |
# to load it | |
with open("bayesian_nn.pkl", "rb") as f: | |
data = pickle.load(f) | |
load_model, loaded_trace = data["model"], data["trace"] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment