Skip to content

Instantly share code, notes, and snippets.

@strongh
Created July 19, 2016 22:07
Show Gist options
  • Save strongh/0ba143d21e2382e3ec61f6a0bdc2f55d to your computer and use it in GitHub Desktop.
Save strongh/0ba143d21e2382e3ec61f6a0bdc2f55d to your computer and use it in GitHub Desktop.
toy example of MCMC using (py)stan and (py)spark
data {
int<lower=0> J; // number of schools
real y[J]; // estimated treatment effects
real<lower=0> sigma[J]; // s.e. of effect estimates
}
parameters {
real mu;
real<lower=0> tau;
real eta[J];
}
transformed parameters {
real theta[J];
for (j in 1:J)
theta[j] = mu + tau * eta[j];
}
model {
eta ~ normal(0, 1);
y ~ normal(theta, sigma);
}
from pyspark import SparkContext, SparkConf
import pystan
import numpy as np
## TODO any advantage in compiling stan model once? possibly for local execution but maybe less for distributed mode.
## TODO how to decide which parameters to aggregate? all? for multiple parameters, make each parameter a key and reduceByKey.
## TODO Here I sometimes assume that parameter theta is 1-dimensional.
## TODO how to get prior covariances?
## 4.1: http://arxiv.org/pdf/1602.05221v2.pdf
appName = "ExampleConcensusMCMC"
SCHOOL_DATA = zip(
[28, 8, -3, 7, -1, 1, 18, 12], # y
[15, 10, 16, 11, 9, 11, 10, 18]) # sigma
if __name__ == "__main__":
def mcmc(sts):
## do MCMC...
sts = list(sts)
schools_dat = {'J': len(sts), # different J here!
'y': [d[0] for d in sts],
'sigma': [d[1] for d in sts]}
fit = pystan.stan(file="schools.stan", data=schools_dat,
iter=1000, chains=4)
return [np.array(fit["mu"])]
def consensus_avg(J):
## this was not designed to be a pairwse operation
## so it's a bit ugly
def c(f1, f2):
if np.isnan(f1).any() and np.isnan(f2).any():
return 0
if np.isnan(f1).any():
return f2
if np.isnan(f2).any():
return f1
# I ignore the prior covariance and assume theta is 1-d
# problem: need to know how many partitions there are
sigma_j = np.zeros(2)
fs = [f1, f2]
for j in [0, 1]:
sigma_j[j] = np.cov(fs[j])
sigma = 1.0/(sum(1.0/(sigma_j)))
w_j = np.zeros(2)
for j in [0, 1]:
w_j[j] = sigma * (1/J + 1.0/sigma_j[j])
return w_j[0] * f1 + w_j[1] * f2
return c
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf=conf)
subposteriors = sc.parallelize(SCHOOL_DATA, 3).mapPartitions(mcmc)
consensusSamples = subposteriors.reduce(
consensus_avg(subposteriors.getNumPartitions()))
print "----"
print "Combined posterior samples:"
print consensusSamples
print "----"
@strongh
Copy link
Author

strongh commented Jul 26, 2016

in case anybody every looks at this: I've started to turn this into more of a library: https://github.com/strongh/stark

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment