Created
September 23, 2014 22:45
Mock astronomy image generation code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division, print_function | |
import abc | |
import numpy as np | |
from matplotlib import pyplot as plt | |
from astropy import units as u | |
from astropy.modeling import Fittable2DModel, Parameter, fitting | |
from astropy.modeling import format_input as _format_input | |
maggie = u.def_unit('maggie', 3631*u.Jy) | |
nmaggie = u.def_unit('nmaggie', 3631e-9*u.Jy) | |
TWOPI = 2 * np.pi | |
class MockImage(object): | |
""" | |
Creates mock images to test photometry routines on | |
""" | |
def __init__(self, **kwargs): | |
kwargs.setdefault('verbose', False) | |
kwargs.setdefault('xsize', 512) | |
kwargs.setdefault('ysize', 512) | |
# SDSS r band from Fukugita+ 96 | |
kwargs.setdefault('filter_cen', 6261*u.angstrom) | |
kwargs.setdefault('filter_fwhm', 1382*u.angstrom) | |
kwargs.setdefault('effective_area', 1 * u.m**2) | |
kwargs.setdefault('quantum_efficiency', 1 * u.electron / u.photon) | |
kwargs.setdefault('gain', 1 * u.electron / u.adu) | |
kwargs.setdefault('read_noise', 0 * u.electron) | |
kwargs.setdefault('saturation', None) | |
kwargs.setdefault('bias', 0 * u.adu) | |
kwargs.setdefault('exp_time', 1 * u.s) | |
kwargs.setdefault('psf_model',MoffatPSFModel(alpha_x=5, alpha_y=5)) | |
kwargs.setdefault('background', {'type': 'constant', 'value': 1.*nmaggie}) | |
kwargs.setdefault('starlocs', [[], []]) | |
if 'starmags' in kwargs and 'starfluxs' in kwargs: | |
raise ValueError('cannot give both starmags and starfluxs') | |
elif 'starmags' in kwargs: | |
self.starmags = np.array(kwargs.pop('starmags')) | |
elif 'starfluxs' in kwargs: | |
self.starfluxs = np.array(kwargs.pop('starfluxs')) | |
else: | |
self.starfluxs = np.array([]) | |
kwargs.setdefault('gallocs', [[], []]) | |
kwargs.setdefault('galsizes', [[], []]) | |
kwargs.setdefault('galpas', []) | |
kwargs.setdefault('galns', []) | |
kwargs.setdefault('galconvwpsf', False) | |
if 'galmags' in kwargs and 'galfluxs' in kwargs: | |
raise ValueError('cannot give both galmags and galfluxs') | |
elif 'galmags' in kwargs: | |
self.galmags = np.array(kwargs.pop('galmags')) | |
elif 'galfluxs' in kwargs: | |
self.galfluxs = np.array(kwargs.pop('galfluxs')) | |
else: | |
self.galfluxs = np.array([]) | |
for nm in kwargs: | |
setattr(self, nm, kwargs[nm]) | |
self.starfluxsimg = None | |
self.galfluximg = None | |
self.bkgfluximg = None | |
self.artifactfluximg = None | |
self.fluximg = None | |
@property | |
def starmags(self): | |
return -2.5*np.log10(self.starfluxs.to(maggie).value) | |
@starmags.setter | |
def starmags(self, value): | |
self.starfluxs = maggie*10**(value/-2.5) | |
@property | |
def galmags(self): | |
return -2.5*np.log10(self.galfluxs.to(maggie).value) | |
@galmags.setter | |
def galmags(self, value): | |
self.galfluxs = maggie*10**(value/-2.5) | |
@property | |
def xsize(self): | |
return self._xsize | |
@xsize.setter | |
def xsize(self, value): | |
self._xsize = value | |
self._baseimg = self._coords = None | |
@property | |
def ysize(self): | |
return self._ysize | |
@ysize.setter | |
def ysize(self, value): | |
self._ysize = value | |
self._baseimg = self._coords = None | |
@property | |
def coords(self): | |
if self._coords is None: | |
x = np.arange(self.xsize) | |
y = np.arange(self.ysize) | |
x, y = np.meshgrid(x, y) | |
self._coords = (x.T, y.T) | |
return self._coords | |
@property | |
def baseimg(self): | |
if self._baseimg is None: | |
self._baseimg = np.ones((self.xsize, self.ysize)) | |
return self._baseimg | |
def _print(self, *args, **kwargs): | |
if self.verbose: | |
print(*args, **kwargs) | |
sys.stdout.flush() | |
def create_stars_flux(self): | |
xs, ys = self.starlocs | |
flxs = self.starfluxs | |
if len(flxs) != len(xs): | |
raise ValueError('number of star fluxes must match locations') | |
starimg = 0*nmaggie * self.baseimg | |
mod = self.psf_model.copy() | |
mod.flux = 1 | |
for i, (x, y, flx) in enumerate(zip(xs, ys, flxs)): | |
self._print('\rOn star {i} of {nstars}'.format(i=i+1, nstars=len(xs)), end='') | |
mod.cen_x = x | |
mod.cen_y = y | |
starimg += flx * mod(*self.coords) | |
self._print('') | |
return starimg | |
def create_galaxies_flux(self): | |
from astropy import convolution | |
xs, ys = self.gallocs | |
flxs = self.galfluxs | |
rxs, rys = self.galsizes | |
pas = self.galpas | |
ns = self.galns | |
mod = GalaxyModel() | |
galimg = 0*nmaggie * self.baseimg | |
zpars = zip(xs, ys, flxs, rxs, rys, pas, ns) | |
for i, (x, y, flx, rx, ry, pa, n) in enumerate(zpars): | |
self._print('\rOn galaxy {i} of {nstars}'.format(i=i+1, nstars=len(xs)), end='') | |
mod.cen_x = x | |
mod.cen_y = y | |
mod.re_x = rx | |
mod.re_y = ry | |
mod.pa = pa | |
mod.n = n | |
galimg += flx * mod(*self.coords) | |
self._print('') | |
if self.galconvwpsf and (len(xs) > 0): | |
self._print("Convolving gal flux w/PSF") | |
xsz = (self.psf_model.scale_x*-2 - 0.5, self.psf_model.scale_x*2 + 0.5) | |
ysz = (self.psf_model.scale_y*-2 - 0.5, self.psf_model.scale_y*2 + 0.5) | |
ckernel = convolution.discretize_model(self.psf_model, xsz, ysz) | |
cgalimg = convolution.convolve_fft(galimg.value, ckernel) | |
#conserve flux! also preserve the unit val `galimg` | |
fluxratio = np.sum(galimg) / np.sum(cgalimg) | |
galimg = fluxratio * cgalimg | |
return galimg | |
def create_background_flux(self): | |
if self.background['type'] == 'constant': | |
return self.baseimg * self.background['value'] | |
else: | |
raise ValueError('Unrecognized background type:' + str(self.background['type'])) | |
def create_artifacts_flux(self): | |
from warnings import warn | |
warn('artifacts not yet implemented') | |
return 0*nmaggie * self.baseimg | |
def generate_flux_image(self): | |
self.starfluximg = self.create_stars_flux() | |
self.galfluximg = self.create_galaxies_flux() | |
self.bkgfluximg = self.create_background_flux() | |
self.artifactfluximg = self.create_artifacts_flux() | |
self.fluximg = self.starfluximg + self.galfluximg + self.bkgfluximg + self.artifactfluximg | |
return self.fluximg | |
def convert_to_counts(self, img=None, subtract_bkg=False, poisson=True): | |
""" | |
Parameters | |
---------- | |
img | |
The flux image to convert or None to use ``self.fluximg`` | |
subtract_bkg : True, False, or 'smooth' | |
If True, subtract the background noisly (with a new realization of | |
the background). If 'smooth', use a smooth background. | |
random | |
Returns | |
------- | |
countimg | |
The image in ADUs (integer if `poisson` is True) | |
""" | |
from astropy.constants import h | |
if img is None: | |
img = self.fluximg | |
# not exactly right for non-zero width, but close enough? | |
cenfreq = self.filter_cen.to(u.Hz, u.spectral()) | |
freq1 = (self.filter_cen - self.filter_fwhm/2).to(u.Hz, u.spectral()) | |
freq2 = (self.filter_cen + self.filter_fwhm/2).to(u.Hz, u.spectral()) | |
bandwidthfreq = np.ptp([freq1.value, freq2.value]) * u.Hz | |
ph_to_en = u.photon / (h * cenfreq) | |
photonimg = (ph_to_en * img * bandwidthfreq * self.exp_time * | |
self.effective_area).to(u.photon) | |
electronimg = photonimg * self.quantum_efficiency | |
#now apply read noise | |
if self.read_noise.value: | |
rnoise = np.random.randn(self.xsize, self.ysize) * self.read_noise | |
electronimg = electronimg + rnoise | |
self.countimg = aduimg = (electronimg / self.gain).to(u.adu) | |
if self.saturation is not None: | |
self.last_saturated = aduimg > self.saturation | |
aduimg[self.last_saturated] = self.saturation | |
if poisson: | |
res = (np.random.poisson(aduimg.value) * u.adu + self.bias).astype(int) | |
else: | |
res = aduimg.value * u.adu + self.bias | |
if subtract_bkg: | |
oldsat = self.saturation | |
try: | |
self.saturation = None | |
if subtract_bkg == 'smooth': | |
return res - self.convert_to_counts(self.bkgfluximg, poisson=False) | |
else: | |
return res - self.convert_to_counts(self.bkgfluximg, poisson=True) | |
finally: | |
self.saturation = oldsat | |
else: | |
return res | |
def find_psf_stars(self, imgmsk=None, magcut=None, isodistance=20): | |
""" | |
Finds suitable PSF stars. They have to be isolated out to `isodistance`, | |
no pixels in `imgmsk`, and bright enough according to `magcut` | |
""" | |
from scipy.spatial import cKDTree | |
sx, sy = self.starlocs | |
spixx = np.round(sx).astype(int) | |
spixy = np.round(sy).astype(int) | |
gx, gy = self.gallocs | |
gpixx = np.round(gx).astype(int) | |
gpixy = np.round(gy).astype(int) | |
allocs = np.array([np.concatenate((sx, gx)), np.concatenate((sy, gy))]) | |
#allmags = np.concatenate([self.starmags, self.galmags]) | |
kdt = cKDTree(allocs.T) | |
dnearest = kdt.query(self.starlocs.T, 2)[0][:, 1] | |
msk = np.ones(len(self.starmags), dtype=bool) | |
if imgmsk is not None: | |
# if the nearest pixel to the second is in `imgmsk`/saturated, don't | |
# include it | |
print(imgmsk[spixx, spixy]) | |
msk = msk & ~imgmsk[spixx, spixy] | |
if isodistance is not None: | |
msk = msk & (dnearest > isodistance) | |
msk = msk & (isodistance < spixx) & (spixx < (self.xsize - isodistance)) | |
msk = msk & (isodistance < spixy) & (spixy < (self.ysize - isodistance)) | |
if magcut is not None: | |
msk = msk & (self.starmags < magcut) | |
return np.where(msk)[0] | |
def show_in_ds9(self, img=None, frame=None, ds9=None, markstars=True, markgals=True, inclmags=False): | |
from pyds9 import DS9 | |
if ds9 is None: | |
ds9 = DS9() | |
if img is None: | |
img = self.fluximg.view(np.ndarray) | |
else: | |
img = img.view(np.ndarray) | |
if img.dtype.kind == 'i': | |
img = img.astype('int32') | |
if frame is not None: | |
ds9.set('frame {0}'.format(frame)) | |
ds9.set_np2arr(img) | |
if markstars: | |
if inclmags: | |
regs = ['text {0} {1} "s,{2:.2f}" # color=red'.format(x+1, y+1, m) | |
for y, x, m in zip(self.starlocs[0], self.starlocs[1], self.starmags)] | |
else: | |
regs = ['point {0} {1} # point=x color=red'.format(x+1, y+1) for y, x in zip(*self.starlocs)] | |
ds9.set('regions', '\n'.join(regs)) | |
if markgals: | |
if inclmags: | |
regs = ['text {0} {1} "g,{2:.2f}" # color=red'.format(x+1, y+1, m) | |
for y, x, m in zip(self.gallocs[0], self.gallocs[1], self.galmags)] | |
else: | |
regs = ['point {0} {1} # point=diamond color=red'.format(x+1, y+1) for y, x in zip(*self.gallocs)] | |
ds9.set('regions', '\n'.join(regs)) | |
return ds9 | |
class GalaxyModel(object): | |
""" | |
sersic model w/ rotation. total flux is 1 | |
""" | |
def __init__(self, cen_x=0, cen_y=0, re_x=1, re_y=1, pa=0*u.deg, n=1): | |
self.cen_x = cen_x | |
self.cen_y = cen_y | |
self.re_x = re_x | |
self.re_y = re_y | |
self.pa = pa | |
self.n = n | |
def __call__(self, x, y): | |
from scipy.special import gamma | |
cpa = np.cos(self.pa).value | |
spa = np.sin(self.pa).value | |
dx = x - self.cen_x | |
dy = y - self.cen_y | |
x = dx*cpa + dy*spa | |
y = -dx*spa + dy*cpa | |
r = np.hypot(x / self.re_x, y / self.re_y) | |
bn = 1.9992*self.n - 0.3271 | |
twon = 2 * self.n | |
ltot = self.re_x * self.re_y * 2 * np.pi * self.n * np.exp(bn) * (bn**-twon) * gamma(twon) | |
return np.exp(-bn*(r**(1/self.n)-1)) / ltot | |
class PSFModel(Fittable2DModel): | |
__metaclass__ = abc.ABCMeta | |
def fit_data(self, imgdata, xy=None, fittertype=fitting.LevMarLSQFitter, **kwargs): | |
""" | |
Fit the model to provided image data. | |
Note that this ignores bounds when using `LevMarLSQFitter` because | |
it seems to be broken. | |
Parameters | |
---------- | |
imdata : 2D array | |
The image data to fit this model to | |
xy : 2-tuple of arrays or None | |
An (x, y) tuple of the pixel grid (each should be 2D), or None to | |
use a default 0-indexed grid. | |
Returns | |
------- | |
mod : same as self | |
A new model with the best-fit parameters. The fitter is available | |
as `self.last_fitter` | |
""" | |
if xy is None: | |
x, y = np.mgrid[:imgdata.shape[0], :imgdata.shape[1]] | |
else: | |
x, y = xy | |
f = fittertype() | |
if fittertype == fitting.LevMarLSQFitter: | |
mod0 = self.copy() | |
for nm in mod0.bounds: | |
mod0.bounds[nm] = (None, None) | |
else: | |
mod0 = self | |
res = f(mod0, x, y, imgdata, **kwargs) | |
self.last_fitter = res.last_fitter = f | |
#compute the chi-squared | |
res.chi2 = np.sum((res(x, y) - imgdata)**2) | |
dofs = x.size - len(res.parameters) + sum(res.fixed.values()) | |
res.chi2r = res.chi2 / dofs | |
return res | |
def imshow_model(self, xy=None, n=100, **kwargs): | |
""" | |
A convinience method to show the model as a discritized image | |
""" | |
kwargs.setdefault('origin', 'lower') | |
if xy is None: | |
x, y = np.mgrid[:n, :n] | |
else: | |
x, y = xy | |
img = self(x, y) | |
kwargs.setdefault('interpolation', 'none') | |
return plt.imshow(img, **kwargs) | |
@abc.abstractproperty | |
def scale_x(self): | |
"""Some sort of meaningful scale along the x direction""" | |
raise NotImplementedError | |
@abc.abstractproperty | |
def scale_y(self): | |
"""Some sort of meaningful scale along the y direction""" | |
raise NotImplementedError | |
class MoffatPSFModel(PSFModel): | |
flux = Parameter(default=1) | |
cen_x = Parameter(default=0) | |
cen_y = Parameter(default=0) | |
alpha_x = Parameter(default=1, min=0) | |
alpha_y = Parameter(default=1, min=0) | |
theta = Parameter(default=0, min=0, max=45) # in degrees | |
beta = Parameter(default=4.765, fixed=True) | |
linear = False | |
@staticmethod | |
#def eval(x, y, flux, cen_x, cen_y, alpha_x, alpha_y, theta): | |
def eval(x, y, flux, cen_x, cen_y, alpha_x, alpha_y, theta, beta): | |
""" | |
Normalized such that the integral out to infinity is 1 | |
""" | |
from math import radians | |
if theta == 0: | |
x1 = x - cen_x | |
y1 = y - cen_y | |
else: | |
thetar = radians(theta) | |
s = np.sin(thetar) | |
c = np.cos(thetar) | |
x1 = (x-cen_x) * c - (y-cen_y) * s | |
y1 = (x-cen_x) * s + (y-cen_y) * c | |
r = np.hypot(x1/alpha_x, y1/alpha_y) | |
return flux*(beta-1)*(1 + r**2)**-beta / (np.pi * alpha_x * alpha_y) | |
@property | |
def scale_x(self): | |
return self.alpha_x | |
@scale_x.setter | |
def scale_x(self, value): | |
self.alpha_x = value | |
@property | |
def scale_y(self): | |
return self.alpha_y | |
@scale_y.setter | |
def scale_y(self, value): | |
self.alpha_y = value | |
class GaussianPSFModel(PSFModel): | |
flux = Parameter(default=1) | |
cen_x = Parameter(default=0) | |
cen_y = Parameter(default=0) | |
sig = Parameter(default=1, min=0) | |
linear = False | |
@staticmethod | |
def eval(x, y, flux, cen_x, cen_y, sig): | |
""" | |
Normalized such that the integral out to infinity is 1 | |
""" | |
from math import radians | |
r = np.hypot(x - cen_x, y - cen_y)/sig | |
return flux*np.exp(-0.5 * r**2)/(sig**2)/TWOPI | |
@property | |
def scale_x(self): | |
return self.sig_x | |
@scale_x.setter | |
def scale_x(self, value): | |
self.sig_x = value | |
@property | |
def scale_y(self): | |
return self.sig_y | |
@scale_y.setter | |
def scale_y(self, value): | |
self.sig_y = value | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment