Skip to content

Instantly share code, notes, and snippets.

@ipashchenko
Last active February 23, 2018 15:00
Show Gist options
  • Save ipashchenko/1ffb8710d2de3ddaf1b4514132927275 to your computer and use it in GitHub Desktop.
Save ipashchenko/1ffb8710d2de3ddaf1b4514132927275 to your computer and use it in GitHub Desktop.
Empirical Bayes with RQ+WN kernel
import numpy as np
import george
import pickle
from functools import partial
import scipy.optimize as op
import matplotlib.pyplot as plt
def is_embraced(curve, means, widths):
"""
Function that checks if 1D curve is embraced by 1D band.
:param curve:
Iterable of curve's points.
:param means:
Iterable of band's means.
:param widths:
Iterable of widths (sigmas).
:return:
``True`` if curve is embraced by band.
"""
curve = np.array(curve)
means = np.array(means)
widths = np.array(widths)
assert len(curve) == len(means) == len(widths)
diff = np.abs(curve - means)
return np.alltrue(np.nan_to_num(diff) <= np.nan_to_num(widths))
def count_contained(curves, means, widths):
"""
Count haw many curves are contained inside given band.
:param curves:
Iterable of numpy 1D arrays with curves to count.
:param means:
Iterable of band's means.
:param widths:
Iterable of widths (sigmas).
:return:
Number of curves within band.
"""
i = 0
for curve in curves:
if is_embraced(curve, means, widths):
i += 1
return i
def calculate_scb(curves, mu, std, alpha=0.05, delta=0.01):
"""
Calculate simulataneous confidence band.
:param curves:
Samples of GP.
:param mu:
Mean values.
:param std:
STD values.
:param alpha: (optional)
Significance level of a corresponding hypothesis test. (default:
``0.05``)
:param delta: (optional)
Step of widening of band. (default: ``0.01``)
:return:
Two 1D numpy arrays with low and upper alpha-level simultaneous
confidence bands.
"""
n = count_contained(curves, mu, std)
f = 1
while n < len(curves) * alpha:
f += delta
n = count_contained(curves, mu, f*std)
return mu - f*std, mu + f*std
def find_maxima(samples, mu, std, grad_samples, alpha=0.05):
"""
Find position of maxima using samples from GP.
:param samples:
Iterable of arrays. Conditional samples from GP (with fixed HP in
empirical Bayes and HP from their posterior in full Bayes).
:param mu:
Array of mean values.
:param grad_samples:
Iterable of arrays. Gradients of ``samples``.
:param alpha: (optional)
Significance level of a corresponding hypothesis test. (default:
``0.05``)
:return:
1D array of positions and 2D array of errors (low, upper) of maximum
positions.
"""
grad_samples = np.atleast_2d(grad_samples)
mu_grad = np.gradient(mu)
std_grad = np.array([np.std(grad_samples[:, i]) for i in range(len(mu))])
# Low and up boundaries of SCB for gradients
low, up = calculate_scb(grad_samples, mu_grad, std_grad)
# Define the objective function (negative log-likelihood in this case).
def nll(p, y, gp):
"""
:param p:
Mean, log(Disp_WN), log(A**2), log(alpha), log(scale**2)
"""
gp.set_parameter_vector(p)
ll = gp.log_likelihood(y, quiet=True)
return -ll if np.isfinite(ll) else 1e25
# And the gradient of the objective function.
def grad_nll(p, y, gp):
"""
:param p:
Mean, log(Disp_WN), log(A**2), log(alpha), log(scale**2)
"""
gp.set_parameter_vector(p)
return -gp.grad_log_likelihood(y, quiet=True)
def optimize_gp(t, y, yerr, p0=None, t_new=None, n_new=None, show_fig=False,
file_fig=None, color="#1f77b4", fig=None, label=None):
"""
Optimize GP hyperparameters for given data.
:param t:
Array of times.
:param y:
Array of the observed values.
:param yerr:
Array of the observed errors.
:param p0: (optional)
Initial values for the hyperparameters ("Amp", "log_alpha", "tau",
"wn_disp").
:param n_new: (optional)
Number of new points where to sample fitted GP. There will be ``n_new``
points evently distributed in ``(t[0], t[-1])`` interval. If ``None``
then time points where to sample fitted GP must be specified in
``t_new`` argument. (default: ``None``)
:param t_new: (optional)
Array of times where to sample fitted GP. This allows more efficiently
span time intervals near flare maximum. If ``None`` then time points
where to sample fitted GP must be specified by ``n_new`` argument.
(default: ``None``)
:param show_fig: (optional)
Show figure? (default: ``False``)
:param file_fig: (optional)
File to save picture of fitted GP. If ``None`` then don't save picture.
(default: ``None``)
:param color: (optional)
Color for plots. If ``None`` then use standard. (default: ``None``)
:param fig: (optional)
Figure to plot. If ``None`` then create one. (default: ``None``)
:param label: (optional)
Label for the plot. If ``None`` then don't label. (default: ``None``)
:return:
Dictionary with results.
:notes:
Points where to sample fitted GP could be specified with both ``t_new``
and ``n_new`` but former has the priority. If none is specified than
predictions for those points in ``t`` are returned.
"""
if p0 is None:
p0 = [np.mean(y), 0.2, 1, -2, 200]
kernel = p0[2]**2*george.kernels.RationalQuadraticKernel(log_alpha=p0[3],
metric=p0[4]**2)
gp = george.GP(kernel, mean=np.mean(y), fit_mean=True,
white_noise=np.log(p0[1]**2), fit_white_noise=True)
nll_ = partial(nll, y=y, gp=gp)
grad_nll_ = partial(grad_nll, y=y, gp=gp)
gp.compute(t, yerr)
print(gp.log_likelihood(y))
p0 = gp.get_parameter_vector()
results = op.minimize(nll_, p0, jac=grad_nll_, method="L-BFGS-B")
gp.set_parameter_vector(results.x)
print("HP : {}".format(results.x))
print(gp.log_likelihood(y))
if t_new is None:
if n_new is None:
t_new = t
else:
t_new = np.linspace(t[0], t[-1], n_new)
# Kernel means using only RQ kernel n predictions
mu, cov = gp.predict(y, t_new, return_var=True, kernel=kernel)
std = np.sqrt(cov)
if fig is None:
fig = plt.figure(figsize=(12, 5))
ax = fig.gca()
ax.errorbar(t, y, yerr=yerr, fmt=".", color=color, ms=3, elinewidth=0.5,
label=label)
ax.plot(t_new, mu, color=color, lw=1)
samples = gp.sample_conditional(y, t_new, size=100)
grad_samples = list()
for sample in samples:
plt.plot(t_new, sample, color="r", alpha=0.1)
grad_sample = np.gradient(sample)
grad_samples.append(grad_sample)
plt.plot(t_new, grad_sample, color="r", alpha=0.1)
ax.set_xlabel("Time, MJD")
ax.set_ylabel("Flux, Jy")
ax.fill_between(t_new, mu+std, mu-std, color=color, alpha=0.1)
if label is not None:
fig.legend(loc="best")
if show_fig:
fig.show()
if file_fig is not None:
fig.savefig(file_fig)
return {"gp": gp, "logL": gp.log_likelihood(y), "mu": mu,
"std": std, "fig": fig, "t_new": t_new, "t": t, "y": y,
"yerr": yerr, "samples": samples, "grad_samples": grad_samples}
def run(dataset, n_new=None, t_new=None):
"""
Runs ``optimize_gp`` for dataset of 3 frequencies
:param dataset:
Path to pickled dataset with lightcurves for 3 frequencies.
:param n_new: (optional)
Number of new points where to sample fitted GP. There will be ``n_new``
points evently distributed in ``(t[0], t[-1])`` interval. If ``None``
then time points where to sample fitted GP must be specified in
``t_new`` argument. (default: ``None``)
:param t_new: (optional)
Array of times where to sample fitted GP. This allows more efficiently
span time intervals near flare maximum. If ``None`` then time points
where to sample fitted GP must be specified by ``n_new`` argument.
(default: ``None``)
:return:
Figure and dictionary with results for each frequency.
"""
with open(dataset, "rb") as fo:
data = pickle.load(fo)
fig = plt.figure(figsize=(12, 5))
result_dict = {}
for freq, data, color in zip(["15", "8", "5"], data, ['#1f77b4', '#ff7f0e',
'#2ca02c']):
t = data[:, 0]
y = data[:, 1]
yerr = data[:, 2]
result = optimize_gp(t, y, yerr, n_new=n_new, t_new=t_new, fig=fig,
color=color, label=freq)
result_dict.update({freq: result})
return fig, result_dict
if __name__ == "__main__":
with open("0716+714_umrao_lc.pkl", "rb") as fo:
data = pickle.load(fo)
data15 = data[0]
# Use only part of the light curve to make it usable
t = data15[:, 0]
y = data15[:, 1]
yerr = data15[:, 2]
result = optimize_gp(t, y, yerr, n_new=2000, show_fig=True)
# fig, result_dict = run("0851+202_umrao_lc.pkl", n_new=5000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment