Last active
June 10, 2021 15:26
-
-
Save philastrophist/619fb7045d760212a06c95f811effb21 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import emcee | |
from sklearn.mixture import GaussianMixture | |
import vegas | |
from astropy.cosmology import Planck15 | |
import numpy as np | |
from scipy import stats | |
from astropy import units as u | |
import matplotlib.pyplot as plt | |
jy_factor = (u.W / u.Hz / u.m / u.m).to(u.Jy) | |
whz_factor = (u.Jy * u.m * u.m).to(u.W / u.Hz) | |
class Integrator: | |
def __init__(self, logm_observed, logm_error, f_observed, f_error, z_observed, z_error, | |
mu, V, | |
logl_range, logm_range, | |
z_range=(0.0001, 1.), zpoints=1000, n_adapt_samples=1000, | |
zdeg=2, cosmo=Planck15, alpha=-0.71): | |
self.f_observed = f_observed | |
self.f_error = f_error | |
self.z_observed = z_observed | |
self.z_error = z_error | |
self.logm_observed = logm_observed | |
self.logm_error = logm_error | |
self.mu = mu | |
self.V = V | |
self.gaussian_component = stats.multivariate_normal(mu, V) | |
if z_range[0] <= 0: | |
raise ValueError(f"minimum z cannot be zero") | |
self.z_range = z_range | |
self.alpha = alpha | |
_zs = np.linspace(*z_range, num=zpoints) | |
_dls = cosmo.luminosity_distance(_zs).to(u.m).value | |
self.dl_params = np.polyfit(_zs, _dls, zdeg) | |
self.logm_range = logm_range | |
self.logl_range = logl_range | |
self.adaptive_map = vegas.AdaptiveMap([logm_range, logl_range, z_range]) # uniform map | |
self.logm_dist = stats.norm(self.logm_observed, self.logm_error) | |
self.f_dist = stats.norm(self.f_observed, self.f_error) | |
self.z_dist = stats.norm(self.z_observed, self.z_error) | |
self._samples = self.generate_map_samples(n_adapt_samples) | |
def logdl(self, z): | |
"""luminosity_distance in metres""" | |
return np.log10(np.polyval(self.dl_params, z)) | |
def logl_to_f(self, logl, z): | |
"""log[W/Hz] -> Jy (10^-26W/m2/Hz) | |
W/Hz/m^2 | |
""" | |
return (10**(logl - np.log10(4 * np.pi) - (2 * self.logdl(z)) + ((1 + self.alpha) * np.log10(1 + z)))) * jy_factor | |
def f_to_logl(self, f, z): | |
return np.log10(f * whz_factor * 4 * np.pi) + (2 * self.logdl(z)) - ((1 + self.alpha) * np.log10(1 + z)) | |
def invdetjacobian(self, f, z): | |
""" | |
The jacobian for the transform `P(f,z | vL, vz) -> P(log[L], z | vL, vz)` is | |
[[1/f, dL/dz], | |
[0, 1]] | |
1 / abs(determinant) ~ f | |
""" | |
return f / np.log(10) # since we're using log10 | |
def logprob_lz(self, true_logl, true_z): | |
true_f = self.logl_to_f(true_logl, true_z) | |
return self.f_dist.logpdf(true_f) + np.log(self.invdetjacobian(true_f, true_z)) | |
def logprob_observed_given_true(self, true_m, true_logl, true_z): | |
logl_part = self.logprob_lz(true_logl, true_z) | |
z_part = self.z_dist.logpdf(true_z) | |
m_part = self.logm_dist.logpdf(true_m) | |
return logl_part + z_part + m_part | |
def logprob_gaussian_component(self, true_mlogl): | |
return self.gaussian_component.logpdf(true_mlogl) | |
def logintegrand(self, x): | |
r = np.where(x[:, 2] >= 0, self.logprob_observed_given_true(x[:, 0], x[:, 1], x[:, 2]) + \ | |
self.logprob_gaussian_component(x[:, :2]), -np.inf) | |
r[~np.isfinite(r)] = -np.inf | |
return r | |
def sample_true_given_observed(self, n): | |
zs = self.z_dist.rvs(n) | |
fs = self.f_dist.rvs(n) | |
logms = self.logm_dist.rvs(n) | |
logls = self.f_to_logl(fs, zs) | |
r = np.stack([logms, logls, zs]).T | |
return r[np.isfinite(r).all(axis=1)] | |
def generate_map_samples(self, ninit, nwalkers=100, chain=200, burn=100): | |
comp_samples = np.concatenate([self.gaussian_component.rvs(ninit), self.z_dist.rvs((ninit, 1))], axis=1) | |
samples = np.concatenate([self.sample_true_given_observed(ninit), comp_samples]) | |
gmm = GaussianMixture(1).fit(samples) | |
samples = np.concatenate([samples, gmm.sample(ninit)[0]]) | |
ndim = 3 | |
p0 = samples.reshape(-1, ndim)[:nwalkers, :] | |
nwalkers = p0.shape[0] | |
sampler = emcee.EnsembleSampler(nwalkers, ndim, self.logintegrand, vectorize=True) | |
sampler.run_mcmc(p0, chain) | |
return sampler.chain[:, burn:].reshape(-1, ndim) | |
def integrate(self, neval=1e5, nitn=10, nadapt=10, alpha=0.1): | |
x, y = self._samples, self.logintegrand(self._samples) | |
adjustment = -np.mean(y) | |
y = np.exp(y + adjustment) | |
self.adaptive_map.adapt_to_samples(x, y, nitn=nadapt) # precondition map | |
integ = vegas.Integrator(self.adaptive_map, alpha=alpha) | |
func = vegas.batchintegrand(lambda x: np.exp(self.logintegrand(x) + adjustment)) | |
return integ(func, neval=neval, nitn=nitn), np.exp(adjustment) | |
def plot_result(self, r): | |
plt.figure() | |
plt.errorbar(np.arange(len(r.itn_results)), *zip(*[(i.mean, i.sdev) for i in r.itn_results])) | |
plt.axhspan(r.mean - r.sdev, r.mean + r.sdev, alpha=0.2) | |
def plot_plane(self, log=False, slices=(slice(8, 12, 0.05), slice(19,22,0.05))): | |
plt.figure() | |
grid = np.mgrid[slices[0], slices[1], self.z_observed:self.z_observed+0.001:0.001][..., :1] | |
r = self.logintegrand(grid.reshape(grid.shape[0], -1).T).T.reshape(grid.shape[1:]) | |
if not log: | |
r = np.exp(r) | |
plt.pcolormesh(grid[0, ..., 0], grid[1, ..., 0], r[..., 0]) | |
plt.colorbar() | |
m, l = self.sample_true_given_observed(10000)[:, :2].T | |
h, xedges, yedges = np.histogram2d(m, l, bins=10) | |
xbins = xedges[:-1] + (xedges[1] - xedges[0]) / 2 | |
ybins = yedges[:-1] + (yedges[1] - yedges[0]) / 2 | |
h = h.T | |
plt.contour(xbins, ybins, h, levels=10, cmap='Reds') | |
gaussian_prob = np.exp(self.logprob_gaussian_component(grid[:2, ..., 0].reshape(2, -1).T).T.reshape(grid.shape[1:-1])) | |
plt.contour(grid[0, ..., 0], grid[1, ..., 0], gaussian_prob, cmap='Greys') | |
plt.xlabel('logM') | |
plt.ylabel('logL') | |
plt.xlim(slices[0].start, slices[0].stop) | |
plt.ylim(slices[1].start, slices[1].stop) | |
def plot_luminosity_distribution(self, z=None): | |
plt.figure() | |
z = self.z_observed if z is None else z | |
ls = np.linspace(*self.logl_range, num=1000) | |
lnp = self.logprob_lz(ls, z) | |
plt.plot(ls, np.exp(lnp)) | |
plt.xlabel('logL') | |
if __name__ == '__main__': | |
mu = [11, 20.8] | |
cov = np.eye(2) * 0.01 | |
f, ferr, z, zerr = 1e-4, 1e-3, 0.085, 0.0001 | |
m, merr = 11, 0.1 | |
# logprobs = [] | |
# ferrs = [1e-5, 1e-4, 1e-3, 1e-2, 1e-1] | |
# for ferr in ferrs: | |
# integrator = Integrator(m, merr, f, ferr, z, zerr, mu, cov, [0, 50], [0, 50]) | |
# r, adjustment = integrator.integrate() | |
# logprobs.append(np.log(r.val) - adjustment) | |
# plt.plot(ferrs, logprobs) | |
integrator = Integrator(m, merr, f, ferr, z, zerr, mu, cov, [0, 50], [0, 50]) | |
r, adjustment = integrator.integrate() | |
# print(f'result = {r / np.exp(adjustment)} adjustment=exp({adjustment})') | |
# print(r.summary()) | |
print(np.log(r) - adjustment) | |
integrator.plot_plane() | |
integrator.plot_luminosity_distribution() | |
integrator.plot_result(r) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Added sampler to get better initial adaption samples