Created
August 14, 2010 12:48
-
-
Save mblondel/524271 to your computer and use it in GitHub Desktop.
MCMC exercises
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Exercises for the Markov Chain Monte-Carlo (MCMC) course available at | |
http://users.aims.ac.za/~ioana/ | |
""" | |
import numpy as np | |
import numpy.linalg as la | |
import pylab | |
from scipy import stats | |
from scipy import linalg | |
""" | |
1. Markov Chains. | |
""" | |
K = np.array([[0.9, 0.1], [0.3, 0.7]]) | |
lambda_ = np.array([0.2, 0.8]) | |
class MarkovChain(object): | |
def __init__(self, K, lambda_): | |
self.K = K # transition matrix/kernel | |
self.lambda_ = lambda_ # initial state distribution | |
def m_step_kernel(self, m): | |
return np.array((np.matrix(self.K) ** m)) | |
def X_t(self, t): | |
return np.dot(lambda_, self.m_step_kernel(t)) | |
def invariant_distribution(self): | |
eigenvalues, eigenvectors = la.eig(self.K.T) | |
idx = eigenvalues.argsort()[::-1][0] | |
mu = eigenvectors[:,idx] | |
return mu/mu.sum() | |
def sample(self, n): | |
s = [np.random.multinomial(1, self.lambda_).argmax()] | |
for i in range(n-1): | |
s.append(np.random.multinomial(1, self.K[s[-1],:]).argmax()) | |
return s | |
mc = MarkovChain(K, lambda_) | |
def ex1_1(): | |
""" | |
Compute the m-step kernel of K. | |
""" | |
print mc.m_step_kernel(2) | |
def ex1_2(): | |
""" | |
Compute distribution of X_t. | |
""" | |
print mc.X_t(2) | |
def ex1_3(): | |
""" | |
Compute invariant distribution. | |
""" | |
print mc.invariant_distribution() | |
def ex1_4(): | |
""" | |
Plot the value of X_t over time to show that it converges | |
to the invariant distribution. | |
""" | |
distn = np.array([mc.X_t(t) for t in xrange(20)]) | |
pylab.figure() | |
pylab.plot(distn[:,0], 'bs-', distn[:,1], 'rs-') | |
pylab.show() | |
def ex1_5(): | |
""" | |
Plot a chain sample. | |
""" | |
N = 100 | |
s = mc.sample(N) | |
pylab.figure() | |
pylab.plot(s, 'gs-') | |
pylab.show() | |
def ex1_6(): | |
""" | |
Verify proportion of 0 and 1 after sampling is similar to | |
invariant distribution. | |
""" | |
N = 10000 | |
s = mc.sample(N) | |
hist, bins = np.histogram(s, bins=2) | |
print hist/(N*1.0) | |
""" | |
2. Inverse cdf method and Rejection sampling. | |
""" | |
def sample_expo(n, lambda_): | |
""" | |
The exponential distribution has cdf: | |
F(x) = 1 - exp(lambda_ * x) | |
To sample from the exponential distribution: | |
* generate a sample u from the uniform distribution | |
* F^-1(u) = -log(1-u)/lambda_ | |
""" | |
u = np.random.uniform(size=n) | |
return -np.log(1-u)/lambda_ | |
def ex2_2(): | |
""" | |
Plot the histogram for the data samples with sample_expo | |
and superimpose the density function of the exponential distribution | |
to check that they have the same shape. | |
""" | |
lambda_ = 2.0 | |
n = 100 | |
pylab.figure() | |
pylab.hist(sample_expo(n, lambda_), normed=True) | |
x = np.linspace(0,5,n) | |
pylab.plot(x, lambda_*np.exp(-lambda_*x)) | |
pylab.show() | |
def sample_gamma_integer(k, lambda_): | |
""" | |
To sample from Gamma(k, lambda_), where k is an integer, | |
we can take advantage of the fact that | |
Y = X_1 + ... + X_k ~ Gamma(k, lambda_) | |
when X_1, ..., X_k are i.i.d and X_k ~ Expo(lambda_) | |
""" | |
return np.sum(sample_expo(k, lambda_)) | |
def gamma_pdf(x, alpha, lambda_): | |
return stats.gamma.pdf(x*lambda_, alpha)*lambda_ | |
def ex2_4(): | |
""" | |
Plot the proposal distribution in red and the distribution | |
to sample from in blue to show that the blue curve is | |
always under the red curve. | |
""" | |
alpha = 5.7 | |
k = np.floor(alpha) | |
lambda_ = 2.0 | |
# multiplicative factore to ensure that the proposal distribution | |
# is always above | |
M = gamma_pdf(alpha-k,alpha,lambda_)/gamma_pdf(alpha-k,k,lambda_-1) | |
x = np.linspace(0,10,100) | |
pylab.figure() | |
pylab.plot(x, gamma_pdf(x,alpha,lambda_),'b-') | |
pylab.plot(x, M*gamma_pdf(x,k,lambda_-1),'r:') | |
pylab.show() | |
def sample_gamma(alpha, lambda_): | |
""" | |
Sample from the Gamma(alpha,lambda) using rejection sampling. | |
sample_gamma_integer is used as the proposal distribution. | |
""" | |
k = np.floor(alpha) | |
M = gamma_pdf(alpha-k,alpha,lambda_)/gamma_pdf(alpha-k,k,lambda_-1) | |
while True: | |
x = sample_gamma_integer(k, lambda_-1) | |
prob_accept = gamma_pdf(x, alpha, lambda_) / \ | |
(M * gamma_pdf(x, k, lambda_-1)) | |
u = np.random.random() | |
if u <= prob_accept: | |
return x | |
def ex2_6(): | |
""" | |
Plot the theoritical density and the histogram obtained from sampling | |
to show that they have the same shape. | |
""" | |
alpha = 5.7 | |
lambda_ = 2.0 | |
x = [sample_gamma(alpha,lambda_) for i in xrange(1000)] | |
pylab.figure() | |
pylab.hist(x, normed=True) | |
x = np.linspace(0,10,100) | |
pylab.plot(x, gamma_pdf(x, alpha,lambda_)) | |
pylab.show() | |
""" | |
3. Importance sampling. | |
""" | |
# only the first 10 are considered labeled | |
y = np.array([3, 6, 3, 5, 9, 14, 12, 11, 19, 18, | |
15, 4, 1, 6, 11, 21, 11, 3, 7, 18]) | |
alpha = 0.1 | |
beta = 0.1 | |
# use rejection sampling above but too slow | |
#def sample_gamma_n(n, alpha, lambda_): | |
# return np.array([sample_gamma(alpha,lambda_) for i in xrange(n)]) | |
def sample_gamma_n(n, alpha, beta): | |
return stats.gamma(alpha).rvs(n)/beta | |
def sample_lambda(n, alpha_post_1, alpha_post_2, beta_post): | |
# sample from f(lambda1|y1,...,y5) | |
lambda_1 = sample_gamma_n(n, alpha_post_1, beta_post) | |
# sample from f(lambda2|y6,...,y10) | |
lambda_2 = sample_gamma_n(n, alpha_post_2, beta_post) | |
weights = np.zeros(n) | |
# compute the weight for each sample | |
for i in xrange(n): | |
weights[i] = np.prod(0.5 * stats.poisson(lambda_1[i]).pmf(y[10:20]) + 0.5 * stats.poisson(lambda_2[i]).pmf(y[10:20])) | |
return weights, lambda_1, lambda_2 | |
def ex3_4(): | |
""" | |
Find the parameters lambda1 and lambda2 of two groups | |
having a Poisson distribution. | |
The proposal distribution used is the distribution fitted on labeled | |
data. | |
""" | |
# by definition the posterior distribution is a gamma distribution | |
# with the following alpha and beta parameters | |
alpha_post_1 = alpha + np.sum(y[0:5]) | |
alpha_post_2 = alpha + np.sum(y[5:10]) | |
beta_post = beta + 5 | |
# the expectation is alpha/beta (shape/scale) | |
post_mean_1_labeled = alpha_post_1 / beta_post | |
post_mean_2_labeled = alpha_post_2 / beta_post | |
weights, lambda_1, lambda_2 = sample_lambda(1000, alpha_post_1, alpha_post_2, beta_post) | |
# compute the expectation (weighted sum) | |
post_mean_1_all = np.sum(lambda_1 * weights) / np.sum(weights) | |
post_mean_2_all = np.sum(lambda_2 * weights) / np.sum(weights) | |
print "Labeled", post_mean_1_labeled, post_mean_2_labeled | |
print "All", post_mean_1_all, post_mean_2_all | |
def kde(data, newpoints, weights, h=1.0): | |
""" | |
Kernel Density Estimation | |
""" | |
weights = weights / np.sum(weights) * len(weights) | |
def K(x, xi): | |
return 1/np.sqrt(2*np.pi) * np.exp(-(x-xi)**2/(2*h**2)) | |
def f(x, xi): | |
n = len(xi) | |
return 1/(n*h) * np.sum(weights * K(x,xi)) | |
return np.array([f(x, data) for x in newpoints]) | |
def ex3_5(): | |
""" | |
Plot the density with and without unlabeled data. | |
Kernel Density Estimation is used for the former in order to extrapolate to | |
new values. | |
""" | |
alpha_post_1 = alpha + np.sum(y[0:5]) | |
alpha_post_2 = alpha + np.sum(y[5:10]) | |
beta_post = beta + 5 | |
weights, lambda_1, lambda_2 = sample_lambda(1000, alpha_post_1, alpha_post_2, beta_post) | |
t = np.linspace(0,20,1000) | |
lambda_1_density = kde(lambda_1, t, weights) | |
pylab.figure(1) | |
pylab.plot(t, lambda_1_density,'r-') | |
pylab.plot(t, gamma_pdf(t, alpha_post_1, beta_post), 'g:') | |
pylab.show() | |
lambda_2_density = kde(lambda_2, t, weights) | |
pylab.figure(2) | |
pylab.plot(t, lambda_2_density,'r-') | |
pylab.plot(t, gamma_pdf(t, alpha_post_2, beta_post), 'g:') | |
pylab.show() | |
def ex3_6(): | |
""" | |
Find the probability to be in group1 for the unlabeled examples (11 to 20). | |
""" | |
alpha_post_1 = alpha + np.sum(y[0:5]) | |
alpha_post_2 = alpha + np.sum(y[5:10]) | |
beta_post = beta + 5 | |
n = 1000 | |
weights, lambda_1, lambda_2 = sample_lambda(n, alpha_post_1, alpha_post_2, beta_post) | |
# for each of the n samples of lambda1 and lambda2 | |
# we want to know the probability of belonging to group 1 or 2 | |
allocations = np.zeros((n,10)) | |
for i in xrange(n): | |
allocations[i,:] = stats.poisson(lambda_1[i]).pmf(y[10:20]) / (stats.poisson(lambda_1[i]).pmf(y[10:20]) + stats.poisson(lambda_2[i]).pmf(y[10:20])) | |
# then we want to take the expectation over the n samples | |
# which we do by taking the weighted average | |
# using the weights obtained by importance sampling | |
# note: the weights are associated with the n samples of lambda | |
# not with the 10 unlabeled data! | |
prob_of_group_1 = np.zeros(10) | |
for i in range(10): | |
prob_of_group_1[i] = sum(allocations[:,i]*weights) / sum(weights) | |
print prob_of_group_1 | |
""" | |
4. Gibbs sampling. | |
""" | |
def sample_conditional(x2, mu1, mu2, sigma1_2, sigma2_2, sigma12): | |
mu = mu1 + (sigma12/sigma2_2) * (x2-mu2) | |
sigma_2 = sigma1_2 - (sigma12**2)/sigma2_2 | |
return np.random.normal(mu, np.sqrt(sigma_2)) # return x1 | |
def gibbs_sampler(n, mu, cov): | |
""" | |
Perform Gibbs sampling for a bivariate normal distribution. | |
mu = [mu1, mu2] | |
cov [[sigma1**2,sigma12],[sigma21, sigma2**2]] | |
""" | |
x1 = np.zeros(n) | |
x2 = np.zeros(n) | |
# initialization | |
x1[0] = np.random.normal(mu[0], np.sqrt(cov[0,0])) | |
x2[0] = np.random.normal(mu[1], np.sqrt(cov[1,1])) | |
for t in range(1,n): | |
x1[t] = sample_conditional(x2[t-1], mu[0], mu[1], cov[0,0], cov[1,1], | |
cov[0,1]) | |
x2[t] = sample_conditional(x1[t], mu[1], mu[0], cov[1,1], cov[0,0], | |
cov[1,0]) | |
return x1, x2 | |
def ex4_2(): | |
""" | |
Plot (x1,x2) | |
(t, x1) | |
(t, x2) | |
""" | |
mu = np.array([0,0]) | |
cov = np.array([[4,1],[1,4]]) | |
cov = np.array([[4,2.8],[2.8,4]]) | |
n = 3000 | |
x1, x2 = gibbs_sampler(n, mu, cov) | |
pylab.figure(0) | |
pylab.plot(x1[2900:],x2[2900:]) | |
pylab.figure(1) | |
pylab.plot(x1,'b') | |
pylab.figure(2) | |
pylab.plot(x2,'r') | |
pylab.show() | |
data = np.array([x1,x2]) | |
# sample mean and covariance matrix | |
print np.mean(data, axis=1) | |
print np.cov(data, rowvar=1) | |
def ex4_3(): | |
""" | |
Plot P(x1 >=0 and x2 >= 0) to show that it converges as t increases. | |
""" | |
mu = np.array([0,0]) | |
cov = np.array([[4,1],[1,4]]) | |
cov = np.array([[4,2.8],[2.8,4]]) | |
n = 3000 | |
x1, x2 = gibbs_sampler(n, mu, cov) | |
prob = np.zeros(n) | |
count = int(x1[0] >= 0 and x2[0] >= 0) | |
prob[0] = count | |
for t in range(1,n): | |
count += int(x1[t] >= 0 and x2[t] >= 0) | |
prob[t] = count * 1.0 / t | |
pylab.figure(3) | |
pylab.plot(prob) | |
pylab.axis([-200,n+200,None,None]) | |
pylab.show() | |
""" | |
4. Metropolis-Hasting. | |
""" | |
def normal_pdf(x, mu, sigma): | |
inv_sigma = linalg.inv(sigma) | |
x_minus_mu = x-mu | |
return np.exp(-0.5*np.dot(np.dot(x_minus_mu.T,inv_sigma),x_minus_mu))/ \ | |
(2*np.pi*np.sqrt(linalg.det(sigma))) | |
def mh_sampling(n, mu, sigma, mu_prop=0, sigma_prop=2.5): | |
""" | |
Metropolis-Hasting sampling with a symmetric proposal distribution (aka | |
Metropolis sampling) for the bivariate gaussian. | |
""" | |
x = np.zeros((n,2)) | |
x[0,:] = np.array([0,0]) # arbitrary initial values | |
accepted_n = 0 | |
for t in xrange(1,n): | |
# can sample 2 iid samples from a univariate normal distribution | |
# since the covariance matrix of the proposal distribution has zero | |
# for non-diagonal values | |
epsilon = np.random.normal(mu_prop, sigma_prop, size=2) | |
x_new = x[t-1,:] + epsilon | |
# for code clarity normal_pdf is recomputed at every iteration but it | |
# could be saved | |
p_accept = min(1.0, normal_pdf(x_new, mu, sigma) / normal_pdf(x[t-1,:], mu, sigma)) | |
if np.random.random() < p_accept: | |
accepted_n += 1 | |
x[t,:] = x_new | |
else: | |
x[t,:] = x[t-1,:] | |
print "The proportion of accepted values is", accepted_n*1.0/n | |
return x | |
def autocorrelation(x): | |
return np.corrcoef(x[1:],x[:-1])[0,1] | |
def ex5_3(): | |
""" | |
Sampling from a bivariate normal distribution using Metropolis-Hasting. | |
""" | |
mu = np.array([0,0]) | |
sigma = np.array([[4,1],[1,4]]) | |
# sigma = np.array([[4,2.8],[2.8,4]]) | |
n = 1000 | |
x = mh_sampling(n, mu, sigma, sigma_prop=2.5) | |
pylab.figure(0) | |
pylab.plot(x[:,0],'b') | |
pylab.title("Sample path of X_1") | |
pylab.figure(1) | |
pylab.plot(x[:,1],'r') | |
pylab.title("Sample path of X_2") | |
# Plot the mean to show that it converges to mu | |
x1_cummean = np.cumsum(x[:,0]) / (1 + np.arange(n)) | |
x2_cummean = np.cumsum(x[:,1]) / (1 + np.arange(n)) | |
pylab.figure(2) | |
pylab.plot(x1_cummean, "b") | |
pylab.title("Empirical mean of X_1") | |
pylab.figure(3) | |
pylab.plot(x2_cummean, "r") | |
pylab.title("Empirical mean of X_2") | |
pylab.show() | |
# Autocorrelation | |
x1_sd = np.sqrt(np.var(x[:,0])) | |
x2_sd = np.sqrt(np.var(x[:,1])) | |
x1_autocorr = autocorrelation(x[:,0]) | |
x2_autocorr = autocorrelation(x[:,1]) | |
print "The autocorrelation of X_1 is", x1_autocorr | |
print "The autocorrelation of X_2 is", x2_autocorr | |
# Effective sample size | |
x1_ess = n * (1 - x1_autocorr) / (1 + x1_autocorr) | |
x2_ess = n * (1 - x2_autocorr) / (1 + x2_autocorr) | |
print "The effective sample size of X_1 is", x1_ess | |
print "The effective sample size of X_2 is", x2_ess | |
if __name__ == "__main__": | |
import sys | |
import __main__ | |
getattr(__main__, "ex" + str(sys.argv[1]))() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment