Last active
February 23, 2018 15:00
-
-
Save ipashchenko/1ffb8710d2de3ddaf1b4514132927275 to your computer and use it in GitHub Desktop.
Empirical Bayes with RQ+WN kernel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import george | |
import pickle | |
from functools import partial | |
import scipy.optimize as op | |
import matplotlib.pyplot as plt | |
def is_embraced(curve, means, widths): | |
""" | |
Function that checks if 1D curve is embraced by 1D band. | |
:param curve: | |
Iterable of curve's points. | |
:param means: | |
Iterable of band's means. | |
:param widths: | |
Iterable of widths (sigmas). | |
:return: | |
``True`` if curve is embraced by band. | |
""" | |
curve = np.array(curve) | |
means = np.array(means) | |
widths = np.array(widths) | |
assert len(curve) == len(means) == len(widths) | |
diff = np.abs(curve - means) | |
return np.alltrue(np.nan_to_num(diff) <= np.nan_to_num(widths)) | |
def count_contained(curves, means, widths): | |
""" | |
Count haw many curves are contained inside given band. | |
:param curves: | |
Iterable of numpy 1D arrays with curves to count. | |
:param means: | |
Iterable of band's means. | |
:param widths: | |
Iterable of widths (sigmas). | |
:return: | |
Number of curves within band. | |
""" | |
i = 0 | |
for curve in curves: | |
if is_embraced(curve, means, widths): | |
i += 1 | |
return i | |
def calculate_scb(curves, mu, std, alpha=0.05, delta=0.01): | |
""" | |
Calculate simulataneous confidence band. | |
:param curves: | |
Samples of GP. | |
:param mu: | |
Mean values. | |
:param std: | |
STD values. | |
:param alpha: (optional) | |
Significance level of a corresponding hypothesis test. (default: | |
``0.05``) | |
:param delta: (optional) | |
Step of widening of band. (default: ``0.01``) | |
:return: | |
Two 1D numpy arrays with low and upper alpha-level simultaneous | |
confidence bands. | |
""" | |
n = count_contained(curves, mu, std) | |
f = 1 | |
while n < len(curves) * alpha: | |
f += delta | |
n = count_contained(curves, mu, f*std) | |
return mu - f*std, mu + f*std | |
def find_maxima(samples, mu, std, grad_samples, alpha=0.05): | |
""" | |
Find position of maxima using samples from GP. | |
:param samples: | |
Iterable of arrays. Conditional samples from GP (with fixed HP in | |
empirical Bayes and HP from their posterior in full Bayes). | |
:param mu: | |
Array of mean values. | |
:param grad_samples: | |
Iterable of arrays. Gradients of ``samples``. | |
:param alpha: (optional) | |
Significance level of a corresponding hypothesis test. (default: | |
``0.05``) | |
:return: | |
1D array of positions and 2D array of errors (low, upper) of maximum | |
positions. | |
""" | |
grad_samples = np.atleast_2d(grad_samples) | |
mu_grad = np.gradient(mu) | |
std_grad = np.array([np.std(grad_samples[:, i]) for i in range(len(mu))]) | |
# Low and up boundaries of SCB for gradients | |
low, up = calculate_scb(grad_samples, mu_grad, std_grad) | |
# Define the objective function (negative log-likelihood in this case). | |
def nll(p, y, gp): | |
""" | |
:param p: | |
Mean, log(Disp_WN), log(A**2), log(alpha), log(scale**2) | |
""" | |
gp.set_parameter_vector(p) | |
ll = gp.log_likelihood(y, quiet=True) | |
return -ll if np.isfinite(ll) else 1e25 | |
# And the gradient of the objective function. | |
def grad_nll(p, y, gp): | |
""" | |
:param p: | |
Mean, log(Disp_WN), log(A**2), log(alpha), log(scale**2) | |
""" | |
gp.set_parameter_vector(p) | |
return -gp.grad_log_likelihood(y, quiet=True) | |
def optimize_gp(t, y, yerr, p0=None, t_new=None, n_new=None, show_fig=False, | |
file_fig=None, color="#1f77b4", fig=None, label=None): | |
""" | |
Optimize GP hyperparameters for given data. | |
:param t: | |
Array of times. | |
:param y: | |
Array of the observed values. | |
:param yerr: | |
Array of the observed errors. | |
:param p0: (optional) | |
Initial values for the hyperparameters ("Amp", "log_alpha", "tau", | |
"wn_disp"). | |
:param n_new: (optional) | |
Number of new points where to sample fitted GP. There will be ``n_new`` | |
points evently distributed in ``(t[0], t[-1])`` interval. If ``None`` | |
then time points where to sample fitted GP must be specified in | |
``t_new`` argument. (default: ``None``) | |
:param t_new: (optional) | |
Array of times where to sample fitted GP. This allows more efficiently | |
span time intervals near flare maximum. If ``None`` then time points | |
where to sample fitted GP must be specified by ``n_new`` argument. | |
(default: ``None``) | |
:param show_fig: (optional) | |
Show figure? (default: ``False``) | |
:param file_fig: (optional) | |
File to save picture of fitted GP. If ``None`` then don't save picture. | |
(default: ``None``) | |
:param color: (optional) | |
Color for plots. If ``None`` then use standard. (default: ``None``) | |
:param fig: (optional) | |
Figure to plot. If ``None`` then create one. (default: ``None``) | |
:param label: (optional) | |
Label for the plot. If ``None`` then don't label. (default: ``None``) | |
:return: | |
Dictionary with results. | |
:notes: | |
Points where to sample fitted GP could be specified with both ``t_new`` | |
and ``n_new`` but former has the priority. If none is specified than | |
predictions for those points in ``t`` are returned. | |
""" | |
if p0 is None: | |
p0 = [np.mean(y), 0.2, 1, -2, 200] | |
kernel = p0[2]**2*george.kernels.RationalQuadraticKernel(log_alpha=p0[3], | |
metric=p0[4]**2) | |
gp = george.GP(kernel, mean=np.mean(y), fit_mean=True, | |
white_noise=np.log(p0[1]**2), fit_white_noise=True) | |
nll_ = partial(nll, y=y, gp=gp) | |
grad_nll_ = partial(grad_nll, y=y, gp=gp) | |
gp.compute(t, yerr) | |
print(gp.log_likelihood(y)) | |
p0 = gp.get_parameter_vector() | |
results = op.minimize(nll_, p0, jac=grad_nll_, method="L-BFGS-B") | |
gp.set_parameter_vector(results.x) | |
print("HP : {}".format(results.x)) | |
print(gp.log_likelihood(y)) | |
if t_new is None: | |
if n_new is None: | |
t_new = t | |
else: | |
t_new = np.linspace(t[0], t[-1], n_new) | |
# Kernel means using only RQ kernel n predictions | |
mu, cov = gp.predict(y, t_new, return_var=True, kernel=kernel) | |
std = np.sqrt(cov) | |
if fig is None: | |
fig = plt.figure(figsize=(12, 5)) | |
ax = fig.gca() | |
ax.errorbar(t, y, yerr=yerr, fmt=".", color=color, ms=3, elinewidth=0.5, | |
label=label) | |
ax.plot(t_new, mu, color=color, lw=1) | |
samples = gp.sample_conditional(y, t_new, size=100) | |
grad_samples = list() | |
for sample in samples: | |
plt.plot(t_new, sample, color="r", alpha=0.1) | |
grad_sample = np.gradient(sample) | |
grad_samples.append(grad_sample) | |
plt.plot(t_new, grad_sample, color="r", alpha=0.1) | |
ax.set_xlabel("Time, MJD") | |
ax.set_ylabel("Flux, Jy") | |
ax.fill_between(t_new, mu+std, mu-std, color=color, alpha=0.1) | |
if label is not None: | |
fig.legend(loc="best") | |
if show_fig: | |
fig.show() | |
if file_fig is not None: | |
fig.savefig(file_fig) | |
return {"gp": gp, "logL": gp.log_likelihood(y), "mu": mu, | |
"std": std, "fig": fig, "t_new": t_new, "t": t, "y": y, | |
"yerr": yerr, "samples": samples, "grad_samples": grad_samples} | |
def run(dataset, n_new=None, t_new=None): | |
""" | |
Runs ``optimize_gp`` for dataset of 3 frequencies | |
:param dataset: | |
Path to pickled dataset with lightcurves for 3 frequencies. | |
:param n_new: (optional) | |
Number of new points where to sample fitted GP. There will be ``n_new`` | |
points evently distributed in ``(t[0], t[-1])`` interval. If ``None`` | |
then time points where to sample fitted GP must be specified in | |
``t_new`` argument. (default: ``None``) | |
:param t_new: (optional) | |
Array of times where to sample fitted GP. This allows more efficiently | |
span time intervals near flare maximum. If ``None`` then time points | |
where to sample fitted GP must be specified by ``n_new`` argument. | |
(default: ``None``) | |
:return: | |
Figure and dictionary with results for each frequency. | |
""" | |
with open(dataset, "rb") as fo: | |
data = pickle.load(fo) | |
fig = plt.figure(figsize=(12, 5)) | |
result_dict = {} | |
for freq, data, color in zip(["15", "8", "5"], data, ['#1f77b4', '#ff7f0e', | |
'#2ca02c']): | |
t = data[:, 0] | |
y = data[:, 1] | |
yerr = data[:, 2] | |
result = optimize_gp(t, y, yerr, n_new=n_new, t_new=t_new, fig=fig, | |
color=color, label=freq) | |
result_dict.update({freq: result}) | |
return fig, result_dict | |
if __name__ == "__main__": | |
with open("0716+714_umrao_lc.pkl", "rb") as fo: | |
data = pickle.load(fo) | |
data15 = data[0] | |
# Use only part of the light curve to make it usable | |
t = data15[:, 0] | |
y = data15[:, 1] | |
yerr = data15[:, 2] | |
result = optimize_gp(t, y, yerr, n_new=2000, show_fig=True) | |
# fig, result_dict = run("0851+202_umrao_lc.pkl", n_new=5000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment