Last active
April 7, 2024 16:59
-
-
Save Sunmish/198ef88e1815d9ba66c0f3ef3b18f74c to your computer and use it in GitHub Desktop.
Measure integrated flux densities in images.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO: | |
# Add precision options. Currently values are returned at arbitrary precision. | |
# Add `strip_extra_axes` function for use when reading in FITS images. | |
# Clean up read_fits and associated FITS file handling functions. | |
# Fix up documentation. | |
# This code is years worth of additions and changes that represent my own 'journey' in learning python | |
# and studying radio astronomy - so some (a lot) of it is horribly ineffecient. | |
import numpy as np | |
import math | |
import os | |
import sys | |
from astropy.io import fits # For handling FITS files. | |
from astropy.wcs import WCS # For computing LAS/positions. | |
from astropy.wcs.utils import proj_plane_pixel_scales | |
from astropy.coordinates import SkyCoord # For cross-referencing. | |
# SkyCoord or coordinates are returning import errors? | |
# This is needed for `measure_tree` - finding a source in a catalogue. | |
from astropy import units as u | |
from scipy.special import erf | |
import logging | |
logging.basicConfig(format="%(levelname)s (%(module)s): %(message)s", \ | |
level=logging.INFO) | |
units = {"arcsec": u.arcsec, \ | |
"arcmin": u.arcmin, \ | |
"deg" : u.degree, \ | |
"degree": u.degree, \ | |
"as" : u.arcsec, \ | |
"am" : u.arcmin, \ | |
"radian": u.radian, \ | |
"rad" : u.radian | |
} | |
# def output_formatter(source_number=None, | |
# int_flux=None, | |
# err_int_flux=None, | |
# peak_flux=None, | |
# avg_flux=None, | |
# npix=None, | |
# weight_coords=None, | |
# bright_coords=None, | |
# area=None, | |
# las=None, | |
# offset=None) | |
def angular_distance(coords1, coords2): | |
"""Get the angular distance between a set of RA, DEC coordinates in [dd]. | |
Uncessary because of SkyCoord? | |
""" | |
cos_angle = math.sin(math.radians(coords1[1])) * \ | |
math.sin(math.radians(coords2[1])) + \ | |
math.cos(math.radians(coords1[1])) * \ | |
math.cos(math.radians(coords2[1])) * \ | |
math.cos(math.radians(coords1[0]) - math.radians(coords2[0])) | |
try: | |
gamma = math.degrees(math.acos(cos_angle)) | |
except ValueError: | |
gamma = math.degrees(math.acos(min(1, max(cos_angle, -1)))) | |
return gamma | |
def gaussian_blob_correction(peak, rms, sigma): | |
"""https://ui.adsabs.harvard.edu/abs/2012MNRAS.425..979H/abstract | |
""" | |
snr = peak / rms | |
eta = (erf(np.sqrt(-np.log((sigma / snr)))))**2 | |
err_eta = np.abs(eta - (erf(np.sqrt(-np.log((sigma / ((peak-rms)/rms))))))**2) | |
return eta, err_eta | |
def dirtybias_correction(model_flux, residual_flux, source_bpp, | |
clean_bias_factor=1., | |
clean_bias_std=0.): | |
""" | |
""" | |
model_flux = np.asarray(model_flux) | |
model_flux[np.where(model_flux < 0.)] = 0. | |
total_model_flux = np.nansum(model_flux) | |
print(">>> model flux: {} Jy".format(total_model_flux)) | |
residual_flux = np.asarray(residual_flux) | |
residual_flux[np.where(residual_flux < 0.)] = 0. | |
total_residual_flux = np.nansum(residual_flux / source_bpp) | |
print(">>> residual flux: {} ({}) Jy".format(total_residual_flux, | |
total_residual_flux/clean_bias_factor)) | |
total_model_flux += (total_residual_flux/clean_bias_factor) | |
residual_err = clean_bias_std*total_residual_flux | |
print(">>> model + residual flux {} Jy".format(total_model_flux)) | |
print(">>> residual error {} Jy".format(residual_err)) | |
return total_model_flux, residual_err | |
# https://stackoverflow.com/a/61629292/3293881 @Ehsan | |
def norm_method(arr, point): | |
point = np.asarray(point) | |
return np.linalg.norm(np.indices(arr.shape, sparse=True)-point) | |
def order_boundaries(coords): | |
"""Order boundary coordinates based on nearest-neighbours. | |
Only used for polygon annotations that are not too messy. | |
""" | |
un_x = [coord[0] for coord in coords] | |
un_y = [coord[1] for coord in coords] | |
ord_coord = [(un_x.pop(0), un_y.pop(0))] | |
for x, y in ord_coord: | |
init = True | |
index = None | |
for i in range(len(un_x)): | |
d = np.sqrt((x-un_x[i])**2 + (y-un_y[i])**2) | |
if init: | |
diff = d | |
index = i | |
init = False | |
elif d < diff: | |
diff = d | |
index = i | |
else: | |
pass | |
if index is None: | |
ord_coord.append(ord_coord[0]) | |
break | |
else: | |
ord_coord.append((un_x.pop(index), un_y.pop(index))) | |
return ord_coord | |
def kvis_polygon(region, r_index=0): | |
"""Get coordinate list for polygon (CLINES) in annotation file. | |
TDOD: ds9 version? | |
""" | |
count = 0 | |
f = open(region, "r") | |
lines = f.readlines() | |
polygons = [] | |
for line in lines: | |
if "clines" in line.lower(): | |
polygons.append(line) | |
poly = polygons[r_index] # If there are more than one cline entries. | |
bits = poly.split(" ") | |
coords = [] | |
for i in range(1, len(bits)-1, 2): | |
coords.append((float(bits[i]), float(bits[i+1]))) | |
f.close() | |
return coords | |
def pix_to_world(x, y, wcs): | |
"""Convenience function for astropy's all_pix2world.""" | |
sub_wcs = wcs.sub(["longitude", "latitude"]) | |
if not isinstance(x, np.ndarray): | |
x, y = np.asarray(x), np.asarray(y) | |
ra, dec = sub_wcs.all_pix2world(y, x, 0) | |
return ra, dec | |
def world_to_pix(ra, dec, warray, naxis): | |
"""A wrapper for astropy's all_world2pix. | |
This helps with issues where NAXIS > 2. | |
Returns lists if the input pixel coordinates are lists/arrays. | |
TODO: use sub_wcs | |
""" | |
if (not isinstance(ra, list)) and (not isinstance(ra, np.ndarray)): | |
ra, dec = [ra], [dec] | |
# if naxis == 2: | |
y, x = warray.all_world2pix(ra, dec, 0) | |
# elif naxis == 3: | |
# y, x, _ = warray.all_world2pix(ra, dec, np.ones_like(ra), 0) | |
# elif naxis == 4: | |
# y, x, _, _ = warray.all_world2pix(ra, dec, np.ones_like(ra), \ | |
# np.ones_like(ra), 0) | |
# else: | |
# raise ValueError(">>> NAXIS must be 2, 3, or 4.") | |
if len(x) == 1: x, y = x[0], y[0] | |
return x, y | |
def get_hdulist(fitsimage): | |
"""Get HDUList object from filepath or form HDUList object.""" | |
if isinstance(fitsimage, str): | |
if not fitsimage.endswith(".fits"): fitsimage += ".fits" | |
hdulist = fits.open(fitsimage) | |
opened = True | |
elif isinstance(fitsimage, fits.HDUList): | |
hdulist = fitsimage | |
opened = False | |
else: | |
raise ValueError("FITS image must be a filename/path or an " \ | |
"astropy.io.fits.HDUList object.") | |
return hdulist, opened | |
def strip_naxis(fitsimage, outname=None, strip_wcs=True): | |
"""Strip additional axes from FITS file. | |
Strips axes 3 and/or 4 if present. | |
""" | |
image, opened = get_hdulist(fitsimage) | |
try: | |
freq = image[0].header["CRVAL3"] | |
image[0].header["FREQ"] = freq | |
except KeyError: | |
pass | |
if image[0].header["NAXIS"] == 3: | |
image[0].data = image[0].data[0, :, :] | |
elif image[0].header["NAXIS"] == 4: | |
image[0].data = image[0].data[0, 0, :, :] | |
else: | |
pass | |
if strip_wcs: | |
try: image[0].header["FREQ"] = image[0].header["CRVAL3"] | |
except KeyError: pass | |
for i in ["3", "4"]: | |
for j in ["CRVAL"+i, "CTYPE"+i, "CDELT"+i, "CRPIX"+i, "NAXIS"+i, "CUNIT"+i, "CROTA"+i]: | |
try: del image[0].header[j] | |
except KeyError: pass | |
image[0].header["NAXIS"] = 2 | |
if outname is None: | |
if not fitsimage.endswith(".fits"): fitsimage += ".fits" | |
outname = fitsimage | |
image.writeto(outname, clobber=True) | |
if opened: | |
image.close() | |
def find_freq(hdr): | |
""" | |
""" | |
freq = None | |
if "FREQ" in hdr.keys(): | |
freq = hdr["FREQ"] | |
if "CRVAL3" in hdr.keys() and freq is None: | |
if "FREQ" in hdr["CTYPE3"]: | |
freq = hdr["CRVAL3"] | |
if "CRVAL4" in hdr.keys() and freq is None: | |
if "FREQ" in hdr["CTYPE4"]: | |
freq = hdr["CRVAL4"] | |
if "CENTCHAN" in hdr.keys() and freq is None: | |
# specific to MWA data | |
freq = float(hdr["CENTCHAN"]) * 1.28e6 | |
return freq | |
def read_fits(fitsimage): | |
"""Read a FITS file with astropy.io.fits and return relevant data. | |
Because the FITS standard is in fact not completely standard there are | |
a few little things that need to be checked to look for the necessary | |
header data. | |
""" | |
opened = False | |
if isinstance(fitsimage, str): | |
# if not fitsimage.endswith(".fits"): fitsimage += ".fits" | |
hdulist = fits.open(fitsimage) | |
opened = True | |
elif isinstance(fitsimage, fits.HDUList): | |
hdulist = fitsimage | |
else: | |
raise IOError(">>> Specified FITS image must an astropy.io.fits.HDUList" \ | |
"object or a filepath.") | |
harray = hdulist[0].header | |
farray = np.squeeze(hdulist[0].data) | |
naxis = harray["NAXIS"] | |
if ("NAXIS3" in harray.keys()) and naxis == 2: | |
for k in ["NAXIS", "CDELT", "CRVAL", "CTYPE", "CRPIX"]: | |
del harray[k+"3"] | |
if ("NAXIS4" in harray.keys()) and naxis < 4: | |
for k in ["NAXIS", "CDELT", "CRVAL", "CTYPE", "CRPIX"]: | |
del harray[k+"4"] | |
warray = WCS(harray).celestial | |
if opened: hdulist.close() | |
# Read some things from the FITS header and check some things: | |
# try: farray.shape = (harray["NAXIS2"], harray["NAXIS1"]) | |
# except ValueError: raise ValueError(">>> FITS files must be flat.") | |
# Try to find pixel sizes: | |
try: | |
cd1, cd2 = harray["CDELT1"], harray["CDELT2"] | |
except KeyError: | |
try: | |
cd1, cd2 = harray["CD1_1"], harray["CD2_2"] | |
except KeyError: | |
raise KeyError(">>> Cannot find key for pixel sizes.") | |
# Beam parameters: | |
semi_a, semi_b, beam_area, beam_not_found = None, None, None, True | |
if ("BMAJ" in harray.keys()) and ("BMIN" in harray.keys()): | |
semi_a = 0.5 * harray["BMAJ"] # Normal keys. | |
semi_b = 0.5 * harray["BMIN"] | |
beam_not_found = False | |
elif ("CLEANBMJ" in harray.keys()) and ("CLEANBMN" in harray.keys()): | |
semi_a = 0.5 * harray["CLEANBMJ"] # Abnormal keys, e.g. VLSSr. | |
semi_b = 0.5 * harray["CLEANBMN"] | |
beam_not_found = False | |
else: | |
try: # Check for NVSS: | |
for i in range(len(harray["HISTORY"])): | |
if "'NRAO VLA SKY SURVEY" in harray["HISTORY"][i]: | |
semi_a = 0.5 * 1.2500e-2 | |
semi_b = 0.5 * 1.2500e-2 | |
beam_not_found = False | |
if beam_not_found: raise KeyError | |
except KeyError: | |
try: # AIPS has a tell-tale mark: | |
for i in range(len(harray["HISTORY"])): | |
if "BMAJ" in harray["HISTORY"][i]: | |
l = [] | |
for t in harray["HISTORY"][i].split(): | |
try: l.append(float(t)) | |
except ValueError: pass | |
semi_a = 0.5 * l[0] | |
semi_b = 0.5 * l[1] | |
beam_not_found = False | |
if beam_not_found: raise KeyError | |
except (KeyError, IndexError): | |
try: # Check for SUMSS, which does NOT have the beam information. | |
# We will use the declination-dependent beam size based on | |
# the reference pixel of the FITS file. | |
for i in range(len(harray["HISTORY"])): | |
if "sumss" in harray["HISTORY"][i].lower(): | |
decl = math.radians(abs(harray["CRVAL2"])) | |
# beam_area = 0.25 * np.pi * (45.0**2 / 3600.0**2) * \ | |
# (1.0 / math.sin(decl)) | |
semi_a = 0.5 * (45./3600.) | |
semi_b = 0.5 * (45./(3600. * math.sin(decl))) | |
beam_not_found = False | |
if beam_not_found: raise KeyError | |
except KeyError: | |
raise KeyError(">>> Beam parameters not found. {}".format(fitsimage)) | |
if beam_area is None: | |
beam_area = np.pi * semi_a * semi_b | |
bpp = beam_area / (abs(cd1*cd2) * np.log(2.0)) | |
beam = (semi_a*2., semi_b*2.) | |
beam_major = semi_a*2. | |
beam_minor = semi_b*2. | |
# include abs() for beam params as sometimes these can be negative... | |
return np.squeeze(farray), warray, abs(bpp), cd1, cd2, naxis, abs(beam_major), abs(beam_minor) | |
def get_header(fitsimage): | |
""" | |
""" | |
opened = False | |
if isinstance(fitsimage, str): | |
# if not fitsimage.endswith(".fits"): fitsimage += ".fits" | |
hdulist = fits.open(fitsimage) | |
opened = True | |
elif isinstance(fitsimage, fits.HDUList): | |
hdulist = fitsimage | |
else: | |
raise IOError(">>> Specified FITS image must an astropy.io.fits.HDUList" \ | |
"object or a filepath.") | |
harray = hdulist[0].header | |
return harray | |
def write_fits_table(outname, names, *params): | |
""" | |
""" | |
columns_to_add = [] | |
for i, data in enumerate(params): | |
if names[i] == "name": | |
fformat = "A" | |
else: | |
fformat = "E" | |
columns_to_add += [ | |
fits.Column(name=names[i], format=fformat, | |
array=data) | |
] | |
if not outname.lower().endswith(".fits"): | |
outname += ".fits" | |
hdu = fits.BinTableHDU.from_columns(columns_to_add) | |
hdu.writeto(outname, overwrite=True) | |
def get_bpp(bmaj, bmin, cd1, cd2): | |
return (np.pi*bmaj*bmin) / (4.*abs(cd1*cd2)*np.log(2.)) | |
def rms_array(rms, farray=None): | |
"""Load or generate rms array. | |
""" | |
if farray is not None: | |
farray = np.squeeze(farray) | |
try: | |
rms = float(rms) | |
rarray = None | |
except (TypeError, ValueError): | |
if isinstance(rms, str): rarray = fits.getdata(rms) | |
elif isinstance(rms, fits.HDUList): rarray = rms[0].data | |
else: raise ValueError(">>> RMS must be specified as either a single " \ | |
"value or as an array/filepath.") | |
if rarray is None: | |
if farray is None: | |
raise ValueError(">>> If RMS array is to be made a template array" \ | |
" must be specified") | |
else: | |
rarray = np.full_like(farray, rms) | |
rarray = np.squeeze(rarray) | |
if rarray.shape != farray.shape: | |
print(farray.shape, rarray.shape) | |
raise ValueError(">>> RMS array and image array must be the same size.") | |
return np.squeeze(rarray) | |
def psf_array(psf, farray=None, index=0): | |
""" | |
""" | |
try: | |
psf = float(psf) | |
parray = None | |
except (TypeError, ValueError): | |
if isinstance(psf, str): | |
parray = fits.getdata(psf)[index, ...] | |
elif isinstance(psf, fits.HDUList): | |
parray = psf[0].data[index, ...] | |
else: | |
raise ValueError(">>> PSF must be specified as either a single " | |
"value or a as an array/filepath.") | |
if parray is None: | |
if farray is None: | |
raise ValueError(">>> If PSF array is to be made a template array " | |
"must be specified.") | |
else: | |
parray = np.full_like(farray, psf) | |
parray = np.squeeze(parray) | |
if parray.shape != farray.shape: | |
raise ValueError(">>> PSF array and image array must be the same size.") | |
return parray | |
# def pixscale_array(cd, farray=None, index=0) | |
def blank_fits_diff(image, ref, outname, rms, sigma=3., overwrite=False): | |
"""Blank image based on ref where ref >= rms*sigma. | |
rms can be image or single value as in other tasks. | |
""" | |
farray, warray, _, _, _, naxis = read_fits(ref) | |
rarray = rms_array(rms, farray) | |
strip_naxis(image, outname=image[:-5]+"_stripped.fits") | |
im = fits.open(image[:-5]+"_stripped.fits") | |
wm = WCS(im[0].header) | |
if naxis > 2: | |
raise ValueError("NAXIS must be <= 2.") | |
ref_ra, ref_dec, ref_x, ref_y = [], [], [], [] | |
for i in range(farray.shape[0]): | |
for j in range(farray.shape[1]): | |
r, d = pix_to_world(i, j, warray) | |
ref_ra.append(r) | |
ref_dec.append(d) | |
ref_x.append(i) | |
ref_y.append(j) | |
ref_c = SkyCoord(ref_ra, ref_dec, unit=(u.deg, u.deg)) | |
im_ra, im_dec, im_x, im_y = [], [], [], [] | |
for i in range(im[0].data.shape[0]): | |
for j in range(im[0].data.shape[1]): | |
if not np.isnan(im[0].data[i, j]): | |
r, d = pix_to_world(i, j, wm) | |
c = SkyCoord(r, d, unit=(u.deg, u.deg)) | |
index = c.match_to_catalog_sky(ref_c)[0] | |
x, y = ref_x[index], ref_y[index] | |
if farray[x, y] < sigma*rarray[x, y]: | |
im[0].data[i, j] = np.nan | |
im.writeto(outname, clobber=overwrite) | |
class Tree(): | |
"""Grow a tree with branches and leaves. | |
Grow a source from neighbouring pixels. | |
""" | |
def __init__(self, tree_number, threshold): | |
""" 'The seed of a tree of giants.' """ | |
self.no = tree_number # The source ID. | |
self.th = threshold[0] # Threshold above rms needed for detection. | |
self.gh = threshold[1] # Threshold above rms needed for growing the detection. | |
self.mh = threshold[2] # maximum threshold above rms | |
def seedling(self, m, n, farray, warray, rarray, forest, diagonals, barray): | |
"""Start from a seedling and grow a tree.""" | |
if farray[m, n] >= self.th*rarray[m, n]: # Detection! | |
forest[m, n] = self.no # Set ref to ID | |
self.leaves = 1 # Count pixels | |
self.fluxes = np.array([farray[m, n]]) # Add flux | |
self.coords = np.array([(m, n)]) # Add pixel coordinates | |
self.bounds = np.array([(m, n)]) # Boundary coordinates | |
self.bright = farray[m, n] # Brightest pixel | |
self.bcoord = [(m, n)] # BP coordinates | |
self.rluxes = np.array([rarray[m, n]]) # Add rmses | |
self.bpp = np.array([barray[m, n]]) | |
indices = [(m, n)] # Indices to check. This is added to. | |
for i, j in indices: | |
# Surrounding pixels: | |
if diagonals: | |
surrounding_indices = [(i-1, j-1), (i-1, j), (i, j-1), \ | |
(i+1, j-1), (i-1, j+1), (i+1, j), (i, j+1), (i+1, j+1)] | |
else: | |
surrounding_indices = [(i-1, j), (i, j-1), (i+1, j), \ | |
(i, j+1)] | |
if i == m and j == n: boundary = True | |
else: boundary = False | |
for index in surrounding_indices: | |
if (index[0] < 0) or (index[1] < 0): | |
pass | |
else: | |
try: | |
if np.isnan(forest[index]) and \ | |
(farray[index] >= self.gh*rarray[index]) and \ | |
(farray[index] < self.mh*rarray[index]): | |
self.leaves += 1 | |
self.fluxes = np.append(self.fluxes, \ | |
farray[index]) | |
self.bpp = np.append(self.bpp, barray[index]) | |
self.rluxes = np.append(self.rluxes, \ | |
rarray[index]) | |
self.coords = np.append(self.coords, [index], \ | |
axis=0) | |
forest[index] = self.no | |
if farray[index] > self.bright: | |
self.bright = farray[index] | |
self.bcoord = [index] | |
indices.append(index) | |
elif np.isnan(forest[index]) and \ | |
(farray[index] < self.gh*rarray[index]) or \ | |
(farray[index] >= self.mh*rarray[index]): | |
forest[index] = 0 | |
farray[index] = np.nan | |
if not boundary: | |
self.bounds = np.append(self.bounds, \ | |
[(i, j)], axis=0) | |
boundary = True | |
elif forest[index] == 0: | |
if not boundary: | |
self.bounds = np.append(self.bounds, \ | |
[(i, j)], axis=0) | |
boundary = True | |
pass | |
except IndexError: | |
pass | |
tree_number = self.no + 1 | |
else: | |
tree_number = self.no | |
forest[m, n] = 0 | |
farray[m, n] = np.nan | |
return farray, forest, tree_number | |
def populate_forest(farray, warray, rarray, threshold, max_pix, min_pix, \ | |
diagonals, barray): | |
"""Grow trees in a forest; find sources in a field.""" | |
# An empty forest: | |
forest = np.empty((len(farray[:, 0]), len(farray[0, :]))) | |
forest[:] = np.nan | |
tree_number = 1 # The current source ID, | |
tree_leaves = {tree_number: 0} # its pixels, | |
tree_fluxes = {tree_number: 0} # its flux values, | |
tree_coords = {tree_number: 0} # its pixel coordinates, | |
tree_bounds = {tree_number: 0} # its boundary coordinates, | |
tree_bright = {tree_number: 0} # Source brightest pixel coordinates. | |
tree_height = {tree_number: 0} | |
tree_rms = {tree_number: 0} # Sum of rms. | |
tree_bpp = {tree_number: 0} | |
for n in range(0, len(farray[0, :])): | |
sys.stdout.write(u"\u001b[1000D" + "{:.>6.1f}%".format(100.*n/len(farray[0, :]))) | |
sys.stdout.flush() | |
for m in range(0, len(farray[:, 0])): | |
# If forest[m, n] is not NaN, it has already been checked. | |
if np.isnan(forest[m, n]): | |
t = Tree(tree_number, threshold) | |
farray, forest, tree_number = t.seedling(m, n, farray, \ | |
warray, rarray, forest, diagonals, barray) | |
try: | |
if (min_pix <= t.leaves <= max_pix): | |
tree_leaves[tree_number-1] = t.leaves | |
tree_fluxes[tree_number-1] = t.fluxes | |
tree_coords[tree_number-1] = t.coords | |
tree_bounds[tree_number-1] = t.bounds | |
tree_bright[tree_number-1] = t.bcoord | |
tree_height[tree_number-1] = t.bright | |
tree_rms[tree_number-1] = t.rluxes | |
tree_bpp[tree_number-1] = t.bpp | |
else: | |
pass | |
except AttributeError: | |
pass | |
print("") | |
return farray, forest, tree_leaves, tree_fluxes, tree_coords, tree_bounds, \ | |
tree_bright, tree_height, tree_rms, tree_bpp | |
def measure_forest(fitsimage, rms=None, cutoff1=None, \ | |
cutoff2=None, cutoff3=None, max_pix=500, min_pix=2, diagonals=True, \ | |
LAS=True, annfile=None, outfile=None, outimage=None, \ | |
verbose=True, | |
psf=None, | |
offset_correct=False, | |
threshold1=None, | |
threshold2=None): | |
"""Calculate the fluxes of individual sources within a FITS file. | |
Parameters | |
---------- | |
fitsimage : str or HDUList object | |
Either the path to a FITS file or the HDUList object if already | |
opened. | |
rms : float or str | |
Either a single rms value for the image, or a rms image mirroring | |
the input FITS file. Minimum detection threshold is `rms` * `cutoff` | |
for each pixel. | |
cutoff1 : int, optional | |
The multiple of `rms` needed for a detection. Default is 3sigma. | |
cutoff2 : int, optional | |
The multiple of `rms` needed for growing sources. Default is `cutoff1`. | |
max_pix : int, optional | |
Maximum number of pixels in a detection. Useful only to save | |
time ignoring large sources (e.g., Fornax A) as LAS calculations | |
are ridiculously slow. | |
min_pix : int, optional | |
Minimum number of pixels for a detection. | |
diagonals : bool, optional | |
Specifies whether source detection counts pixels only connected | |
diagonally. Select True for diagonal detection. | |
LAS : bool, optional | |
Select True is wanting to calculate the largest angular size of | |
each source. THIS IS VERY SLOW. | |
annfile : str, optional | |
File to write annotations to. This creates a new file or over- | |
writes an existing one. | |
outfile : str, optional | |
File to write out results to. This creates a new file or over- | |
writes an existing one. | |
outimage : str, optional | |
This allows writing a FITS file with the same header/size as | |
the input image but with pixels below the detection/growth | |
thresholds blanked. An exisiting file with this name will | |
be deleted. | |
verbose : bool, optional | |
Select True if wanting to print output to terminal. | |
""" | |
ann_opened = False | |
if outfile is not None: start_time = datetime.now() | |
# -------------------------------------------------------------------------- | |
if annfile is not None: | |
if annfile.endswith("ann") or annfile.endswith("reg"): | |
ann = open(annfile, "w+") | |
ann_opened = True | |
else: | |
logging.warn(">>> Annotation file must be for Kvis (.ann) or " \ | |
" DS9 (.reg).") | |
logging.warn(">>> No annotation file will be made.") | |
# -------------------------------------------------------------------------- | |
if threshold1 is not None: | |
cutoff1 = threshold1 | |
else: | |
if cutoff1 is None: cutoff1 = 3 | |
if threshold2 is not None: | |
cutoff2 = threshold2 | |
else: | |
if cutoff2 is None: cutoff2 = cutoff1 | |
if cutoff3 is None: cutoff3 = 1.e9 | |
logging.info(">>> Detection threshold set to {0} sigma.".format(cutoff1)) | |
logging.info(">>> Growth threshold set to ...{0} sigma.".format(cutoff2)) | |
farray, warray, bpp, cd1, cd2, naxis, beam_major, beam_minor = read_fits(fitsimage) | |
rarray = rms_array(rms, farray) | |
if psf is None: | |
logging.info(">>> using PSF {:.2f}\" times {:.2f}\"".format(beam_major*3600., | |
beam_minor*3600.)) | |
bmaj = psf_array(beam_major, farray) | |
bmin = psf_array(beam_minor, farray) | |
else: | |
logging.info(">>> using PSF map from {}".format(psf)) | |
bmaj = psf_array(psf, farray, 0) | |
bmin = psf_array(psf, farray, 1) | |
bpp_array = get_bpp(bmaj, bmin, cd1, cd2) | |
if rarray.shape != farray.shape: | |
raise ValueError(">>> RMS array {} and image array {} must be the same size.".format( | |
rarray.shape, farray.shape)) | |
farray, forest, tree_leaves, tree_fluxes, tree_coords, tree_bounds,\ | |
tree_bright, tree_height, tree_rms, tree_bpp = populate_forest( | |
farray=farray, \ | |
warray=warray, rarray=rarray, threshold=(cutoff1, cutoff2, cutoff3), \ | |
max_pix=max_pix, min_pix=min_pix, diagonals=diagonals, | |
barray=bpp_array) | |
if (len(tree_leaves) == 1) and (tree_leaves[1] == 0): | |
raise RuntimeError(">>> No sources detected for {0} with threshold = "\ | |
"{1} sigma.".format(fitsimage, cutoff1)) | |
zero_flag = False | |
if tree_leaves[1] == 0: | |
zero_flag = True | |
del tree_leaves[1] | |
del tree_fluxes[1] | |
del tree_coords[1] | |
del tree_bounds[1] | |
del tree_bright[1] | |
del tree_height[1] | |
source_flux = [] | |
source_dflux = [] | |
source_peak = [] | |
source_centroid = [] | |
source_avg_flux = [] | |
source_avg_rms = [] | |
source_bcoord = [] | |
source_area = [] | |
source_npix = [] | |
source_LAS = [] | |
source = [] | |
source_boundaries = [] | |
source_sum = [] | |
if offset_correct: | |
hdr = get_header(fitsimage) | |
if "BIASA" in hdr.keys() and "BIASB" in hdr.keys(): | |
logging.info("BIASA: {}, BIASB: {}".format(hdr["BIASA"], hdr["BIASB"])) | |
else: | |
logging.warning("No BIASA and/or BIASB parameters for offset corrections.") | |
offset_correct = False | |
for tree in tree_leaves: | |
if not np.isfinite(tree): | |
continue | |
flux_i = sum(tree_fluxes[tree] / tree_bpp[tree]) | |
peak_i = tree_height[tree] | |
rms_i = sum(tree_rms[tree]) / tree_leaves[tree] | |
if offset_correct: | |
peak_bias = (peak_i/rms_i)*hdr["BIASA"] + hdr["BIASB"] | |
int_bias = (peak_bias / peak_i)*flux_i | |
flux_i += int_bias | |
source_sum.append(np.nansum(tree_fluxes[tree])) | |
source_flux.append(flux_i) | |
source_dflux.append(rms_i * np.sqrt(np.nanmean(tree_bpp[tree]) / | |
float(tree_leaves[tree]))) | |
source_avg_flux.append(sum(tree_fluxes[tree]) / tree_leaves[tree]) | |
source_avg_rms.append(rms_i) | |
source_area.append(abs(cd1*cd2) * tree_leaves[tree]) | |
source_npix.append(tree_leaves[tree]) | |
source_peak.append(peak_i) | |
source_bcoord.append((tree_bright[tree][0][1], tree_bright[tree][0][0])) | |
fx, fy = [], [] | |
fz = sum(tree_fluxes[tree]) | |
for i in range(len(tree_fluxes[tree])): | |
fx.append(tree_coords[tree][i][0] * tree_fluxes[tree][i]) | |
fy.append(tree_coords[tree][i][1] * tree_fluxes[tree][i]) | |
source_centroid.append((sum(fx) / fz, sum(fy) / fz)) | |
# The largest angular scale of the source. | |
# This is simply calculated as the largest angular separation between | |
# any two pixels that comprise the source. | |
if LAS: | |
length = 0 | |
for pc1 in range(len(tree_bounds[tree])): | |
ra1, dec1 = pix_to_world(tree_bounds[tree][pc1][1], \ | |
tree_bounds[tree][pc1][0], \ | |
warray) | |
for pc2 in range(pc1+1, len(tree_bounds[tree])): | |
ra2, dec2 = pix_to_world(tree_bounds[tree][pc2][1], \ | |
tree_bounds[tree][pc2][0], \ | |
warray) | |
if (ra1 == ra2) and (dec1 == dec2): | |
diff = 0.0 | |
else: | |
diff = angular_distance((ra1, dec1), (ra2, dec2)) | |
if diff > length: | |
length = diff | |
source_LAS.append(length) | |
else: | |
source_LAS.append(np.nan) | |
# ---------------------------------------------------------------------- | |
if ann_opened: | |
ord_coord = order_boundaries(tree_bounds[tree]) | |
if annfile.endswith("ann"): | |
ann.write("COLOR RED\nCOORD W\n") | |
ann.write("CLINES ") | |
elif annfile.endswith("reg"): | |
ann.write("global color=yellow\nfk5\n") | |
ann.write("polygon ") | |
for i in range(len(ord_coord)): | |
r_dd, d_dd = pix_to_world(ord_coord[i][0], \ | |
ord_coord[i][1], warray) | |
ann.write("{0} {1} ".format(r_dd, d_dd)) | |
ann.write("\n") | |
# ---------------------------------------------------------------------- | |
if zero_flag: source.append(tree-1) | |
else: source.append(tree) | |
wx, wy, bx, by = [], [], [], [] | |
for i in range(len(source)): | |
wx.append(source_centroid[i][0]) | |
wy.append(source_centroid[i][1]) | |
bx.append(source_bcoord[i][0]) | |
by.append(source_bcoord[i][1]) | |
world_coords = pix_to_world(wx, wy, warray) | |
bright_coords = pix_to_world(bx, by, warray) | |
if outfile is not None: | |
if outfile.lower().endswith(".fits"): | |
source_ra = [world_coords[0][i] for i in range(len(source))] | |
source_dec = [world_coords[1][i] for i in range(len(source))] | |
bright_ra = [bright_coords[0][i] for i in range(len(source))] | |
bright_dec = [bright_coords[1][i] for i in range(len(source))] | |
names = ["source", "int_flux", "err_int_flux", "avg_flux", | |
"local_rms", "area", "npix", "ra", "dec", "pra", "pdec", | |
"las", "peak_flux", "sum"] | |
write_fits_table(outfile, names, source, source_flux, source_dflux, | |
source_avg_flux, source_avg_rms, source_area, source_npix, | |
source_ra, source_dec, bright_ra, bright_dec, | |
source_LAS, source_peak, source_sum) | |
else: | |
if outfile[-3:] == "csv": | |
de = "," | |
else: de = " " | |
with open(outfile, "w+") as f: | |
# f.write(" # Sources and their parameters found by `measure_forest`:\n"\ | |
# " #\n" \ | |
# " # ............ Input FITS = {0}\n" \ | |
# " # ................ Output = {1}\n" \ | |
# " # .... Detection treshold = {2}\n" \ | |
# " # ...... Growth threshold = {3}\n" \ | |
# " # ........ Minimum pixels = {4}\n" \ | |
# " # ........ Maximum pixels = {5}\n" \ | |
# " # Total number of sources = {6}\n" \ | |
# " # ............ Time taken = {7}\n" \ | |
# " # ................... LAS = {8}\n" \ | |
# " # \n" \ | |
# " # source{9}flux{9}dflux{9}avg_flux{9}avg_rms{9}area{9}npix{9}centroid_RA{9}centroid_DEC{9}bright_RA{9}bright_DEC{9}LAS{9}peak{9}sum\n" \ | |
# " # Jy Jy Jy/Beam deg^2 Jy/Beam deg deg deg deg deg Jy/beam Jy/beam\n" \ | |
# " # 0 1 2 3 4 5 6 7 8 9 10 11 12 13\n" \ | |
# .format(fitsimage, outfile, cutoff1, cutoff2, min_pix, \ | |
# max_pix, len(source), datetime.now()-start_time, LAS, de)) | |
f.write("source{0}flux{0}dflux{0}avg_flux{0}avg_rms{0}area{0}npix{0}centroid_RA{0}centroid_DEC{0}bright_RA{0}bright_DEC{0}LAS{0}peak{0}sum\n".format(de)) | |
for i in range(len(source)): | |
f.write("{0}{11}{1}{11}{2}{11}{3}{11}{4}{11}{5}{11}{6}{11}{7}" \ | |
"{11}{8}{11}{9}{11}{10}{11}{12}{11}{13}{11}{14}\n".format(source[i], \ | |
source_flux[i], source_dflux[i], source_avg_flux[i], \ | |
source_avg_rms[i], source_area[i], source_npix[i], \ | |
world_coords[0][i], world_coords[1][i], \ | |
bright_coords[0][i], bright_coords[1][i], de, \ | |
source_LAS[i], source_peak[i], source_sum[i])) | |
# -------------------------------------------------------------------------- | |
if ann_opened: | |
if annfile.endswith("reg"): | |
for i in range(len(source)): | |
ann.write(r"text {0} {1} {{0}}\\n".format(\ | |
world_coords[0][i], world_coords[1][i], \ | |
source[i])) | |
elif annfile.endswith("ann"): | |
for i in range(len(source)): | |
ann.write("TEXT {0} {1} {2}\n".format( \ | |
world_coords[0][i], world_coords[1][i], \ | |
source[i])) | |
ann.close() | |
# -------------------------------------------------------------------------- | |
if outimage is not None: | |
if isinstance(fitsimage, str): | |
hdulist = fits.open(fitsimage) | |
elif isinstance(fitsimage, fits.HDUList): | |
hdulist = fitsimage | |
hdulist[0].data = farray | |
if os.path.exists(outimage): | |
logging.warn(">>> Deleting old {0}...".format(outimage)) | |
os.remove(outimage) | |
hdulist[0].header["HISTORY"] = "Output image from `fluxtools.py`." | |
hdulist.writeto(outimage) | |
hdulist.close() | |
if len(source) == 1: | |
world_coords, bright_coords = ([world_coords[0]], [world_coords[1]]), \ | |
([bright_coords[0]], [bright_coords[1]]) | |
return np.array([source, source_flux, source_dflux, source_avg_flux, source_avg_rms, source_area, \ | |
source_npix, world_coords[0], world_coords[1], bright_coords[0], bright_coords[1], | |
source_LAS, source_peak, source_sum]).T | |
def measure_tree(fitsimage, coords, rms, cutoff1=3, cutoff2=None, max_pix=500, \ | |
min_pix=2, max_sep=1.0, diagonals=True, LAS=True, annfile=None, \ | |
outfile=None, \ | |
outimage=None, verbose=False, | |
use_existing=None, | |
psf=None, | |
offset_correct=False, | |
threshold1=None, | |
threshold2=None,): | |
"""Calculate the fluxes of an individual. | |
Parameters | |
---------- | |
fitsimage : str or HDUList object | |
Either the path to a FITS file or the HDUList object if already | |
opened. | |
coords : tuple of floats | |
Coordinates of source of interest. Should be in decimal degrees. | |
rms : float or str | |
Either a single rms value for the image, or a rms image mirroring | |
the input FITS file. Minimum detection threshold is `rms` * `cutoff` | |
for each pixel. | |
cutoff1 : int, optional | |
The multiple of `rms` needed for a detection. Default is 3sigma. | |
cutoff2 : int, optional | |
The multiple of `rms` needed for growing sources. Default is `cutoff1`. | |
max_pix : int, optional | |
Maximum number of pixels in a detection. Useful only to save | |
time ignoring large sources (e.g., Fornax A) as LAS calculations | |
are ridiculously slow. | |
min_pix : int, optional | |
Minimum number of pixels for a detection. | |
diagonals : bool, optional | |
Specifies whether source detection counts pixels only connected | |
diagonally. Select True for diagonal detection. | |
LAS : bool, optional | |
Select True is wanting to calculate the largest angular size of | |
each source. THIS IS VERY SLOW. | |
annfile : str, optional | |
File to write annotations to. This creates a new file or over- | |
writes an existing one. | |
outfile : str, optional | |
File to write out results to. This creates a new file or over- | |
writes an existing one. | |
outimage : str, optional | |
This allows writing a FITS file with the same header/size as | |
the input image but with pixels below the detection/growth | |
thresholds blanked. An exisiting file with this name will | |
be deleted. | |
verbose : bool, optional | |
Select True if wanting to print output to terminal. | |
""" | |
params = None | |
if use_existing is not None: | |
if os.path.exists(use_existing): | |
if use_existing.endswith(".csv"): | |
params = np.genfromtxt(use_existing, delimiter=",", skip_header=1) | |
if params is None: | |
# A forest in which our tree resides: | |
params = measure_forest(fitsimage=fitsimage, | |
rms=rms, | |
cutoff1=cutoff1, | |
cutoff2=cutoff2, | |
max_pix=max_pix, | |
min_pix=min_pix, \ | |
diagonals=diagonals, | |
LAS=LAS, | |
annfile=annfile, | |
outfile=outfile, | |
outimage=outimage, | |
verbose=verbose, | |
psf=psf, | |
offset_correct=offset_correct, | |
threshold1=threshold1, | |
threshold2=threshold2) | |
source = params[:, 0] | |
source_flux = params[:, 1] | |
source_dflux = params[:, 2] | |
source_avg_flux = params[:, 3] | |
source_avg_rms = params[:, 4] | |
source_area = params[:, 5] | |
source_npix = params[:, 6] | |
world_coords_ra = params[:, 7] | |
world_coords_dec = params[:, 8] | |
bright_coords_ra = params[:, 9] | |
bright_coords_dec = params[:, 10] | |
source_LAS = params[:, 11] | |
source_peak = params[:, 12] | |
# source, source_flux, source_dflux, source_avg_flux, source_avg_rms, source_area, \ | |
# source_npix, world_coords_ra, world_coords_dec, bright_coords_ra, bright_coords_dec, \ | |
# source_LAS, source_peak = \ | |
world_coords = (world_coords_ra, world_coords_dec) | |
bright_coords = (bright_coords_ra, bright_coords_dec) | |
# Now to find the tree: | |
c = SkyCoord(coords[0], coords[1], unit=(u.deg, u.deg)) | |
ww_catalogue = SkyCoord(world_coords[0], world_coords[1], unit=(u.deg, u.deg)) | |
if len(source) != 1: | |
i = c.match_to_catalog_sky(ww_catalogue)[0] | |
else: | |
i = 0 | |
dist = angular_distance((coords[0], coords[1]), \ | |
(world_coords[0][i], world_coords[1][i])) | |
if dist > max_sep: | |
raise RuntimeError("No sources found within `max_sep`.") | |
if verbose: | |
if dist < 1.0: | |
dist_print = dist * 60.0 | |
dist_unit = "arcmin" | |
if dist_print < 1.0: | |
dist_print *= 60.0 | |
dist_unit = "arcsec" | |
else: | |
dist_print = dist | |
dist_unit = "deg" | |
print(">>> The following parameters have been found:\n" \ | |
".............. Source no. = {11}\n" \ | |
"............... Int. flux = {0} [Jy]\n" \ | |
"......... Error int. flux = {1} [Jy]\n" \ | |
"............... Peak flux = {2} [Jy/beam]\n" \ | |
"............... Avg. flux = {3} [Jy/beam]\n" \ | |
".............. No. pixels = {4}\n" \ | |
"Flux weighted coordinates = ({5}, {6})\n" \ | |
"..................... LAS = {7} [deg]\n" \ | |
"............. Source area = {8} [deg^2]\n" \ | |
"....... Offset from input = {9} [{10}]".format(\ | |
source_flux[i], source_dflux[i], source_peak[i], \ | |
source_avg_flux[i], source_npix[i], world_coords[0][i], \ | |
world_coords[1][i], source_LAS[i], source_area[i], dist_print, \ | |
dist_unit, source[i])) | |
return source[i], source_flux[i], source_dflux[i], source_peak[i], \ | |
source_avg_flux[i], source_npix[i], (world_coords[0][i], \ | |
world_coords[1][i]), source_LAS[i], \ | |
source_area[i], (bright_coords[0][i], bright_coords[1][i]) | |
# | |
def measure_aperture(fitsimage, coords, radius, rms=None, sigma=3, LAS=False, \ | |
verbose=True, radius_units="deg", z=0, psf=None, | |
offset_correct=True, absolute_flux=False, | |
writeout_image=False, | |
blob_correct=False, | |
): | |
"""Measure flux within a circular aperture. | |
Parameters | |
---------- | |
fitsfimage : string or astropy.io.fits.HDUList | |
If string this should be the filename and path to a FITS file. | |
RA : float | |
Central RA in decimal degrees. | |
DEC : float | |
Central DEC in decimal degrees. | |
radius : float | |
Aperture within which to calculate flux in degree. | |
rms : float or str | |
Either a single rms value for the image, or a rms image mirroring | |
the input FITS file. Minimum detection threshold is `rms` * `cutoff` | |
cutoff : int, optional | |
Multiple of the RMS required for detection. Default is 3. | |
LAS : bool, optional | |
If True an LAS is calculated. | |
verbose : bool, optional | |
If True results are printed to the terminal. | |
radius_units : {'deg', 'arcmin', 'arcsec'}, optional | |
Specifies the unit for `radius`. | |
""" | |
if verbose: | |
print(">>> running measure_aperture on {} with rms {}".format(fitsimage, rms)) | |
farray, warray, bpp, cd1, cd2, naxis, beam_major, beam_minor = read_fits(fitsimage) | |
if rms is not None: | |
rarray = rms_array(rms, farray) | |
rarray = np.squeeze(rarray) | |
if psf is None: | |
if verbose: | |
logging.info(">>> using PSF {:.2f}\" times {:.2f}\"".format(beam_major*3600., | |
beam_minor*3600.)) | |
bmaj = psf_array(beam_major, farray) | |
bmin = psf_array(beam_minor, farray) | |
else: | |
if verbose: | |
logging.info(">>> using PSF map from {}".format(psf)) | |
bmaj = psf_array(psf, farray, 0) | |
bmin = psf_array(psf, farray, 1) | |
# set pixel scaling so we can strip projection-related effects? | |
bpp_array = get_bpp(bmaj, bmin, cd1, cd2) | |
farray = np.squeeze(farray) | |
r = (radius * units[radius_units]).to(u.degree).value | |
scale = int((r / abs(cd1)) + 10) | |
yr, xr = warray.celestial.all_world2pix(coords[0], coords[1], 0) | |
yr = int(yr) | |
xr = int(xr) | |
if absolute_flux: | |
farray = np.absolute(farray) | |
source_flux, source_rms, source_xpixel, source_ypixel, source_coords = \ | |
[], [], [], [], [] | |
source_bpp = [] | |
source_peak = 0 | |
x_range = [xr-scale, xr+scale] | |
y_range = [yr-scale, yr+scale] | |
scale_pix = scale - 10 | |
dist = norm_method(farray, (xr, yr)) | |
cond1 = dist <= scale_pix | |
if rms is not None: | |
cond2 = farray >= sigma*rarray | |
else: | |
cond2 = True | |
indices = np.where(cond1 & cond2) | |
indices2 = np.where(~cond1 | ~cond2) | |
if writeout_image: | |
farray[indices2] = np.nan | |
fits.writeto(fitsimage.replace(".fits", "_ww.fits"), | |
farray, | |
fits.getheader(fitsimage), | |
overwrite=True) | |
source_flux = farray[indices].flatten() | |
source_bpp = bpp_array[indices].flatten() | |
if rms is not None: | |
source_rms = rarray[indices].flatten() | |
source_xpixel = indices[0].flatten() | |
source_ypixel = indices[1].flatten() | |
source_peak = np.nanmax(source_flux) | |
int_flux = sum(source_flux / source_bpp) | |
if rms is not None: | |
unc_flux = (sum(source_rms / source_bpp) * | |
np.sqrt(np.nanmean(source_bpp) / float(len(source_rms)))) | |
else: | |
unc_flux = 0.0 | |
area = len(source_flux) * abs(cd1*cd2) | |
npix = len(source_flux) | |
on_source_rms = np.sqrt((np.mean(np.asarray(source_flux)**2))) | |
rms = np.mean(source_rms) | |
if offset_correct: | |
hdr = get_header(fitsimage) | |
if "BIASA" in hdr.keys() and "BIASB" in hdr.keys(): | |
peak_bias = (source_peak/rms)*hdr["BIASA"] + hdr["BIASB"] | |
# peak_bias = hdr["BIASB"] | |
logging.info("peak bias: {} Jy/beam".format(peak_bias)) | |
int_bias = (peak_bias / source_peak)*int_flux | |
logging.info("integrated flux density bias: {} Jy".format(int_bias)) | |
int_flux += int_bias | |
LAS = False # none of this crap | |
if LAS: | |
las = 0 | |
for i in range(len(source_coords)): | |
for j in range(len(source_coords)): | |
if angular_distance(source_coords[i], source_coords[j]) > las: | |
las = angular_distance(source_coords[i], source_coords[j]) | |
else: | |
las = "NA" | |
if blob_correct: | |
# https://doi.org/10.1111/j.1365-2966.2012.21373.x | |
snr = source_peak / rms | |
eta = (erf(np.sqrt(-np.log((sigma / snr)))))**2 | |
print(">>> eta = {}".format(eta)) | |
print(">>> flux from {} to {}".format(int_flux, int_flux/eta)) | |
int_flux /= eta | |
if verbose: | |
print(">>> The following parameters have been found:\n" \ | |
"............... Int. flux = {0} [Jy]\n" \ | |
"......... Error int. flux = {1} [Jy]\n" \ | |
"............... Peak flux = {2} [Jy/beam]\n" \ | |
".............. No. pixels = {3}\n" \ | |
"..................... LAS = {4} [deg]\n" \ | |
"............. Source area = {5} [deg^2]\n" \ | |
"........... On source rms = {7} [Jy/beam]\n" \ | |
"..................... rms = {6} [Jy/beam]".format(\ | |
int_flux, unc_flux, source_peak, npix, las, area, rms, on_source_rms)) | |
if np.isnan(unc_flux): | |
unc_flux = 0. | |
return int_flux, unc_flux, source_peak, npix, las, area, rms | |
def measure_region(fitsimage, rms, region, r_index=0, sigma=3, annfile=None, \ | |
annfile_color="yellow", verbose=True, pix_buff="A", | |
psf=None, offset_correct=True, | |
absolute_flux=False, | |
clean_bias_factor=1., | |
clean_bias_std=0., | |
model_map=None, | |
residual_map=None, | |
dOmega_map=None, | |
full_region_error=False, | |
blob_correct=False): | |
"""Measure integrated flux within polygon region. | |
Requires pyregion if using ds9 region file. Inspired by `radioflux.py` by | |
Martin Hardcastle: https://github.com/mhardcastle/radioflux, but I wanted a bit | |
more control over errors and additional MWA-specific corrections. | |
This adapts a C++ implementation found here: | |
http://www.geeksforgeeks.org/how-to-check-if-a-given-point-lies-inside-a-polygon/ | |
Note that region vertices must be ordered. | |
Parameters | |
---------- | |
fitsfimage : string or astropy.io.fits.HDUList | |
If string this should be the filename and path to a FITS file. | |
rms : float or str | |
Either a single rms value for the image, or a rms image mirroring | |
the input FITS file. Minimum detection threshold is `rms` * `cutoff` | |
region : str, list, or np.ndarray | |
If a string, this should be a filepath to a ds9.reg file. If | |
a list or array, these should be ordered vertices of a polygon | |
in world coordinates of the image. | |
r_index : int, optional | |
Specifies the index of the polygon if using a ds9.reg file. | |
sigma : int, optional | |
Multiple of the RMS required for measurement. Default is 3. | |
annfile : str, optional | |
File to write annotations to. This creates a new file or over- | |
writes an existing one. The annotations here are just dots | |
indicating each pixel that is measured. | |
annfile_color: str, optional | |
Color of the annotations. Default is `yellow`. | |
verbose : bool, optional | |
If True, additional output is printed to the terminal. | |
""" | |
def is_in(x, y, region_x, region_y, max_x): | |
"""Check if (x, y) is in region. | |
(x, y) is considered in region if the followed is met: | |
A line drawn horizontally to the right from (x, y) intersects | |
with an odd number of region edges. | |
""" | |
def orientation(p1, p2, p3): | |
"""Get orientation of ordered triplet. | |
0 == collinear | |
1 == clockwise | |
2 == counter-clockwise | |
""" | |
slope = (p2[1] - p1[1]) * (p3[0] - p2[0]) - \ | |
(p3[1] - p2[1]) * (p2[0] - p1[0]) | |
if slope < 0: | |
orient = 2 | |
elif slope > 0: | |
orient = 1 | |
else: | |
orient = 0 | |
return orient | |
def on_segment(p1, p2, p3): | |
"""Checks if p3 lies on segment (p1, p2).""" | |
if ((p3[0] <= max(p1[0], p2[0])) and (p3[0] >= min(p1[0], p2[0])) and\ | |
(p3[2] <= max(p1[1], p2[1])) and (p3[1] >= min(p1[1], p2[1]))): | |
return True | |
else: | |
return False | |
def intersects(p1, q1, p2, q2): | |
"""Determine if line segment (p1, q1) intersects with (p2, q2) | |
Uses orientation conditions. | |
""" | |
inter = False | |
# General case: | |
if (orientation(p1, q1, p2) != orientation(p1, q1, q2)) and \ | |
(orientation(p2, q2, p1) != orientation(p2, q2, q1)): | |
inter = True | |
# Special cases: | |
if (orientation(p1, q1, p2) == 0) and (on_segment(p1, q1, p2)): | |
inter = True | |
if (orientation(p1, q1, q2) == 0) and (on_segment(p1, q1, q2)): | |
inter = True | |
if (orientation(p2, q2, p1) == 0) and (on_segment(p2, q2, p1)): | |
inter = True | |
if (orientation(p2, q2, q1) == 0) and (on_segment(p2, q2, q1)): | |
inter = True | |
return inter | |
coords_in_region = [] | |
for i in range(len(x)): | |
p1, q1 = (x[i], y[i]), (x[i]+max_x, y[i]) | |
intersections = 0 | |
for j in range(len(region_x)): | |
p2 = (region_x[j], region_y[j]) | |
if j == len(region_x)-1: | |
q2 = (region_x[0], region_y[0]) | |
else: | |
q2 = (region_x[j+1], region_y[j+1]) | |
if intersects(p1, q1, p2, q2): | |
intersections += 1 | |
if intersections%2 == 0: | |
in_region = False | |
else: | |
in_region = True | |
coords_in_region.append(in_region) | |
return coords_in_region | |
sigma = float(sigma) | |
# | |
ra, dec = [], [] | |
if isinstance(region, str): | |
if region.endswith(".reg"): | |
import pyregion | |
r = pyregion.open(region)[r_index].coord_list | |
for i in range(0, len(r), 2): | |
ra.append(r[i]) | |
dec.append(r[i+1]) | |
elif region.endswith(".ann"): | |
poly_coords = kvis_polygon(region) | |
ra = [coord[0] for coord in poly_coords] | |
dec = [coord[1] for coord in poly_coords] | |
else: | |
raise TypeError(">>> `region` must be one of: DS9.reg, Kvis.ann, or a " \ | |
" list of coordinate tuples.") | |
else: | |
for vertex in region: | |
ra.append(vertex[0]) | |
dec.append(vertex[1]) | |
farray, warray, bpp, cd1, cd2, naxis, beam_major, beam_minor = read_fits(fitsimage) | |
if rms is not None: | |
rarray = rms_array(rms, farray) | |
rarray = np.squeeze(rarray) | |
if psf is None: | |
logging.info(">>> using PSF {:.2f}\" times {:.2f}\"".format(beam_major*3600., | |
beam_minor*3600.)) | |
bmaj = psf_array(beam_major, farray) | |
bmin = psf_array(beam_minor, farray) | |
else: | |
logging.info(">>> using PSF map from {}".format(psf)) | |
bmaj = psf_array(psf, farray, 0) | |
bmin = psf_array(psf, farray, 1) | |
# set pixel scaling so we can strip projection-related effects? | |
if model_map is not None: | |
model_array = np.squeeze(fits.getdata(model_map)) | |
if dOmega_map is not None: | |
dOmega_array = np.squeeze(fits.getdata(dOmega_map)) | |
else: | |
dOmega_array = np.ones_like(model_array) | |
if residual_map is not None: | |
residual_array = fits.getdata(residual_map) | |
bpp_array = get_bpp(bmaj, bmin, cd1, cd2) | |
max_x = len(farray[:, 0]+1) | |
if pix_buff in ["a", "A", "auto", "Auto", "AUTO"]: | |
pix_buff = int(0.1 * farray.shape[-2]) | |
pix_r, pix_d = world_to_pix(ra, dec, warray, naxis) | |
min_x, max_x = min(pix_r), max(pix_r) | |
min_y, max_y = min(pix_d), max(pix_d) | |
avg_rx, avg_dy = int(np.mean(pix_r)), int(np.mean(pix_d)) | |
x_all, y_all, f_all, r_all, b_all = [], [], [], [], [] | |
bmaj_all, bmin_all = [], [] | |
model_all = [] | |
residual_all = [] | |
dOmega_all = [] | |
xlim1 = int(min_x-1) - pix_buff | |
xlim2 = int(max_x+1) + pix_buff | |
ylim1 = int(min_y-1) - pix_buff | |
ylim2 = int(max_y+1) + pix_buff | |
if xlim1 < 0: | |
xlim1 = 0 | |
if ylim1 < 0: | |
ylim1 = 0 | |
if xlim2 > len(farray[:, 0]-1): | |
xlim1 = len(farray[:, 0]-1) | |
if ylim2 > len(farray[:, 1]-1): | |
ylim2 = len(farray[:, 1]-1) | |
if absolute_flux: | |
farray = np.absolute(farray) | |
for i in range(xlim1, xlim2): | |
for j in range(ylim1, ylim2): | |
x_all.append(i) | |
y_all.append(j) | |
f_all.append(farray[i, j]) | |
r_all.append(rarray[i, j]) | |
b_all.append(bpp_array[i, j]) | |
bmaj_all.append(bmaj[i, j]) | |
bmin_all.append(bmin[i, j]) | |
if model_map is not None: | |
model_all.append(model_array[i, j]) | |
dOmega_all.append(dOmega_array[i, j]) | |
if residual_map is not None: | |
residual_all.append(residual_array[i, j]) | |
cir = is_in(x_all, y_all, pix_r, pix_d, max_x) | |
source_flux, source_rms, source_bpp = [], [], [] | |
final_ra, final_dec = [], [] | |
model_flux = [] | |
model_dOmega = [] | |
residual_flux = [] | |
all_rms = [] | |
all_bpp = [] | |
faint_flux1, faint_rms, faint_bpp1 = [], [], [] | |
faint_ra, faint_dec = [], [] | |
faint_flux2, faint_bpp2 = [], [] | |
source_bmaj, source_bmin = [], [] | |
for i in range(len(cir)): | |
if cir[i] and (f_all[i] >= sigma*r_all[i]): | |
source_flux.append(f_all[i]) | |
source_rms.append(r_all[i]) | |
source_bpp.append(b_all[i]) | |
ra_i, dec_i = pix_to_world(x_all[i], y_all[i], warray) | |
final_ra.append(ra_i), final_dec.append(dec_i) | |
source_bmaj.append(bmaj_all[i]) | |
source_bmin.append(bmin_all[i]) | |
# if f_all[i] < clean_sigma1*clean_threshold: | |
# faint_flux1.append(f_all[i]) | |
# faint_rms.append(r_all[i]) | |
# faint_bpp1.append(b_all[i]) | |
# faint_ra.append(ra_i) | |
# faint_dec.append(dec_i) | |
# else: | |
# faint_flux2.append(f_all[i]) | |
# faint_bpp2.append(b_all[i]) | |
if model_map is not None: | |
model_flux.append(model_all[i]) | |
model_dOmega.append(dOmega_all[i]) | |
if residual_map is not None: | |
residual_flux.append(residual_all[i]) | |
all_rms.append(r_all[i]) | |
all_bpp.append(b_all[i]) | |
elif cir[i]: | |
all_rms.append(r_all[i]) | |
all_bpp.append(b_all[i]) | |
print(len(all_rms)) | |
print(len(source_rms)) | |
if annfile is not None: | |
if annfile.endswith(".ann"): | |
with open(annfile, "w+") as g: | |
g.write("colour {0}\n".format(annfile_color)) | |
for i in range(len(final_ra)): | |
g.write("DOT W {0} {1}\n".format(final_ra[i], final_dec[i])) | |
g.write("colour magenta\n") | |
for i in range(len(faint_ra)): | |
g.write("CIRCLE W {0} {1} {2}\n".format(faint_ra[i], faint_dec[i], cd2/3.)) | |
if len(source_flux) == 0: | |
print("No flux above sigma*rms. No measurements done.") | |
return None | |
# raise RuntimeError(">>> No flux above sigma*rms. No measurements done.") | |
else: | |
source_flux = np.asarray(source_flux) | |
source_bpp = np.asarray(source_bpp) | |
source_rms = np.asarray(source_rms) | |
int_flux = sum(source_flux / source_bpp) | |
unc_flux = (sum(source_rms / source_bpp) * | |
np.sqrt(np.nanmean(source_bpp) / float(len(source_rms)))) | |
print(">>> error on pixels above {} sigma: {} Jy".format(sigma, unc_flux)) | |
all_rms = np.asarray(all_rms) | |
all_bpp = np.asarray(all_bpp) | |
# unc_flux = (sum(np.asarray(all_rms) / np.asarray(all_bpp) * np.sqrt(np.nanmean(all_bpp) / float(len(all_rms)))) | |
unc_flux_all = (sum(all_rms / all_bpp) * | |
np.sqrt(np.nanmean(all_bpp) / float(len(all_rms)))) | |
print(">>> error on all pixels in region: {} Jy".format(unc_flux_all)) | |
if full_region_error: | |
unc_flux = unc_flux_all | |
avg_rms = np.nanmean(source_rms) | |
area = len(source_flux) * abs(cd1*cd2) | |
npix = len(source_flux) | |
source_peak = np.nanmax(source_flux) | |
source_bmaj = np.nanmax(source_bmaj) | |
source_bmin = np.nanmax(source_bmin) | |
int_bias = 0. | |
if offset_correct: | |
hdr = get_header(fitsimage) | |
if "BIASA" in hdr.keys() and "BIASB" in hdr.keys(): | |
peak_bias = (source_peak/avg_rms)*hdr["BIASA"] + hdr["BIASB"] | |
logging.info("peak bias: {} Jy/beam".format(peak_bias)) | |
int_bias = (peak_bias / source_peak)*int_flux | |
logging.info("integrated flux density bias: {} Jy".format(int_bias)) | |
int_flux += int_bias | |
total_model_flux = 0. | |
residual_err = 0. | |
if model_map is not None: | |
total_model_flux, residual_err = dirtybias_correction(model_flux=model_flux, | |
residual_flux=residual_flux, | |
source_bpp=source_bpp, | |
clean_bias_factor=clean_bias_factor, | |
clean_bias_std=clean_bias_std) | |
total_model_flux += int_bias | |
if blob_correct: | |
# https://doi.org/10.1111/j.1365-2966.2012.21373.x | |
eta, err_eta = gaussian_blob_correction(source_peak, avg_rms, sigma) | |
print(">>> eta = {}".format(eta)) | |
print(">>> flux from {} to {}".format(int_flux, int_flux/eta)) | |
int_flux /= eta | |
unc_flux = np.sqrt(unc_flux**2 + ((err_eta/eta)*int_flux)**2) | |
total_model_flux /= eta | |
# surface brightness | |
source_sb = int_flux / area | |
unc_flux = np.sqrt(unc_flux**2 + residual_err**2) | |
if verbose: | |
print(">>> The following parameters have been found:\n" \ | |
"............... Int. flux = {0} [Jy]\n" \ | |
"......... Error int. flux = {1} [Jy]\n" \ | |
"............... Peak flux = {2} [Jy/beam]\n" \ | |
".............. No. pixels = {3}\n" \ | |
"...... Surface brightness = {6} [Jy/deg^2]\n" | |
"............. Source area = {4} [deg^2]\n" \ | |
"............. Average rms = {5} [Jy/beam]\n" \ | |
"........ Total model flux = {7} [Jy]\n".format(\ | |
int_flux, unc_flux, source_peak, npix, area, avg_rms, source_sb, total_model_flux)) | |
return int_flux, unc_flux, source_peak, npix, area, source_bmaj, source_bmin, total_model_flux, avg_rms | |
def mask(arrays, regions, wcs, fill_value=np.nan): | |
""" | |
""" | |
import pyregion | |
regions = pyregion.open(regions) | |
# assume square pixels | |
cd1 = proj_plane_pixel_scales(wcs)[0] | |
# cd2 = proj_plane_pixel_scales(wcs)[1] | |
for region in regions: | |
x, y = wcs.wcs_world2pix(region.coord_list[0], region.coord_list[1], 0) | |
r = region.coord_list[2]/cd1 | |
for array in arrays: | |
print(array.shape) | |
indices = np.indices(array.shape) | |
indices_x = indices[0].flatten() | |
indices_y = indices[1].flatten() | |
dist = np.sqrt((indices_x-x)**2 + (indices_y-y)**2) | |
array[indices_y[dist<r], indices_x[dist<r]] = fill_value | |
return arrays | |
def measure_brightness_profile(fitsimage, coords, rms, outer_radius, bin_size, | |
outname, | |
inner_radius=0.0, | |
start_angle=0.0, | |
stop_angle=360.0, | |
psf=None, | |
radial_units="arcsec", | |
brightness_units="mJy", | |
blank_below=0., | |
mask_regions=None): | |
farray, warray, bpp, cd1, cd2, naxis, beam_major, beam_minor = read_fits(fitsimage) | |
if rms is not None: | |
rarray = rms_array(rms, farray) | |
rarray = np.squeeze(rarray) | |
if psf is None: | |
logging.info(">>> using PSF {:.2f}\" times {:.2f}\"".format(beam_major*3600., | |
beam_minor*3600.)) | |
bmaj = psf_array(beam_major, farray) | |
bmin = psf_array(beam_minor, farray) | |
else: | |
logging.info(">>> using PSF map from {}".format(psf)) | |
bmaj = psf_array(psf, farray, 0) | |
bmin = psf_array(psf, farray, 1) | |
# set pixel scaling so we can strip projection-related effects? | |
bpp_array = get_bpp(bmaj, bmin, cd1, cd2) | |
data = farray # Jy/pixel | |
data[data < blank_below] = 0. | |
data_rms = rarray | |
if mask_regions is not None: | |
data = mask([data], mask_regions, warray, fill_value=0.)[0] | |
x_c, y_c = warray.all_world2pix(coords[0], coords[1], 0) | |
print((x_c, y_c)) | |
outer_radius_pix = math.ceil(outer_radius / cd2) | |
inner_radius_pix = math.floor(inner_radius / cd2) | |
x, y = np.indices(np.squeeze(data).shape) | |
x, y = y.flatten(), x.flatten() | |
r = np.sqrt((x-x_c)**2 + (y-y_c)**2) | |
print(y.shape, x.shape) | |
# north-west-south-east; clockwise on the sky | |
t = np.degrees(np.arctan2(y-y_c, x-x_c)) | |
theta = np.mod(np.degrees(np.arctan2(y-y_c, x-x_c)), 360) | |
# move from -pi/2 <= theta < pi/2 to 0 < theta < 360: | |
# theta[np.where(theta <= 0.)] = 180. - theta[np.where(theta <= 0.)] | |
# theta = np.mod(theta - 180., 360.) | |
cond1 = r >= inner_radius_pix | |
cond2 = r <= outer_radius_pix | |
cond3 = theta >= start_angle | |
cond4 = theta <= stop_angle | |
if stop_angle < start_angle: | |
indices = np.where(cond1 & (cond2 | cond3)) | |
not_indices = np.where(~cond1 | ~cond2 | (~cond3 & ~cond4)) | |
else: | |
indices = np.where(cond1 & cond2 & cond3 & cond4) | |
not_indices = np.where(~cond1 | ~cond2 | ~cond3 | ~cond4) | |
y_i, x_i = y[indices], x[indices] | |
y_n, x_n = y[not_indices], x[not_indices] | |
data[y_n, x_n] = 0. | |
data_rms[y_n, x_n] = 0. | |
import matplotlib.pyplot as plt | |
plt.close("all") | |
plt.imshow(data, vmin=0.00001, vmax=1e-3, origin="lower") | |
plt.show() | |
tbin = np.bincount(r.astype("i"), data.flatten()) | |
tbin_rms = np.bincount(r.astype("i"), data_rms.flatten()) | |
tbin_bpp = np.bincount(r.astype("i"), bpp_array.flatten()) | |
nr = np.bincount(r.astype("i")) | |
radial_profile = tbin / nr | |
# radial_profile = tbin | |
radial_profile_rms = tbin_rms / nr | |
# radial_profile_rms = tbin_rms | |
bin_size = int(bin_size/(cd2*3600.)) | |
radial_profile_binned = [] | |
radial_profile_rms_binned = [] | |
radius = [] | |
for i in range(0, len(radial_profile)-bin_size, bin_size): | |
radius.append(np.mean([i, i+bin_size])*cd2) | |
radial_profile_binned.append(np.nansum(radial_profile[i:i+bin_size])) | |
radial_profile_rms_binned.append(np.nanmean(radial_profile_rms[i:i+bin_size])) | |
radius = np.asarray(radius) | |
radial_profile_binned = np.asarray(radial_profile_binned) | |
radial_profile_rms_binned = np.asarray(radial_profile_rms_binned) | |
sector_radii = radius[int(inner_radius_pix)//bin_size:int(outer_radius_pix)//bin_size] | |
sector_profile = radial_profile_binned[int(inner_radius_pix)//bin_size:int(outer_radius_pix)//bin_size] | |
sector_rms = radial_profile_rms_binned[int(inner_radius_pix)//bin_size:int(outer_radius_pix)//bin_size] | |
with open(outname, "w+") as f: | |
f.write("#brightness,radius\n") | |
for i in range(len(sector_radii)): | |
f.write("{},{},{}\n".format(sector_radii[i], sector_profile[i], sector_rms[i])) | |
def measure_sb_profile(fitsimage, coords, rms, outer_radius, bin_size, outname, | |
inner_radius=0., | |
start_angle=0., | |
stop_angle=360., | |
psf=None, | |
blank_below=0., | |
mask_regions=None, | |
flux_scale_err=0.0): | |
farray, warray, bpp, cd1, cd2, naxis, beam_major, beam_minor = read_fits(fitsimage) | |
if rms is not None: | |
rarray = rms_array(rms, farray) | |
rarray = np.squeeze(rarray) | |
if psf is None: | |
logging.info(">>> using PSF {:.2f}\" times {:.2f}\"".format(beam_major*3600., | |
beam_minor*3600.)) | |
bmaj = psf_array(beam_major, farray) | |
bmin = psf_array(beam_minor, farray) | |
else: | |
logging.info(">>> using PSF map from {}".format(psf)) | |
bmaj = psf_array(psf, farray, 0) | |
bmin = psf_array(psf, farray, 1) | |
# set pixel scaling so we can strip projection-related effects? | |
bpp_array = get_bpp(bmaj, bmin, cd1, cd2) | |
data = farray # Jy/pixel | |
data[data < blank_below] = 0. | |
data_rms = rarray | |
if mask_regions is not None: | |
data = mask([data], mask_regions, warray, fill_value=0.)[0] | |
x_c, y_c = warray.all_world2pix(coords[0], coords[1], 0) | |
print((x_c, y_c)) | |
outer_radius_pix = math.ceil(outer_radius / cd2) | |
inner_radius_pix = math.floor(inner_radius / cd2) | |
x, y = np.indices(np.squeeze(data).shape) | |
x, y = y.flatten(), x.flatten() | |
r = np.sqrt((x-x_c)**2 + (y-y_c)**2) | |
print(y.shape, x.shape) | |
# north-west-south-east; clockwise on the sky | |
t = np.degrees(np.arctan2(y-y_c, x-x_c)) | |
theta = np.mod(np.degrees(np.arctan2(y-y_c, x-x_c)), 360) | |
# move from -pi/2 <= theta < pi/2 to 0 < theta < 360: | |
# theta[np.where(theta <= 0.)] = 180. - theta[np.where(theta <= 0.)] | |
# theta = np.mod(theta - 180., 360.) | |
cond1 = r >= inner_radius_pix | |
cond2 = r <= outer_radius_pix | |
cond3 = theta >= start_angle | |
cond4 = theta <= stop_angle | |
if stop_angle < start_angle: | |
indices = np.where(cond1 & (cond2 | cond3)) | |
not_indices = np.where(~cond1 | ~cond2 | (~cond3 & ~cond4)) | |
else: | |
indices = np.where(cond1 & cond2 & cond3 & cond4) | |
not_indices = np.where(~cond1 | ~cond2 | ~cond3 | ~cond4) | |
y_i, x_i = y[indices], x[indices] | |
y_n, x_n = y[not_indices], x[not_indices] | |
r_i = r[indices] | |
data[y_n, x_n] = np.nan | |
data_rms[y_n, x_n] = np.nan | |
import matplotlib.pyplot as plt | |
plt.close("all") | |
plt.imshow(data, vmin=0.00001, vmax=1e-3, origin="lower") | |
plt.show() | |
radii = [] | |
flux = [] | |
err = [] | |
for i in range(len(y_i)): | |
radii.append(r_i[i]) | |
flux.append(data[y_i[i], x_i[i]]) | |
err.append(data_rms[y_i[i], x_i[i]]) | |
bindata = np.array([radii, flux, err]).T | |
bindata_sorted = bindata[bindata[:, 0].argsort()] | |
bin_size = bin_size/cd2 | |
print(bin_size) | |
nbins = abs(outer_radius_pix - inner_radius_pix)//bin_size | |
bin_radii, bin_flux, bin_err = [], [], [] | |
radii = np.asarray(radii) | |
print(radii) | |
flux = np.asarray(flux) | |
err = np.asarray(err) | |
for i in range(int(nbins)): | |
bin_radii.append(((i+1)*bin_size - 0.5*bin_size)*cd2) | |
# tmp_bin_flux = np.nanmean(bindata_sorted[:, 1][np.where(bindata_sorted[:, 0] <= (i+1)*bin_size)]) | |
print((i*bin_size, (i+1)*bin_size)) | |
print(np.where((radii <= (i+1)*bin_size) & (radii > i*bin_size))[0]) | |
bin_flux.append( | |
np.mean( | |
flux[np.where((radii <= (i+1)*bin_size) & (radii > i*bin_size))[0]] | |
) | |
) | |
# bin_flux.append(tmp_bin_flux) | |
# tmp_bin_err = np.nanmean(bindata_sorted[:, 2][np.where(bindata_sorted[:, 0] <= (i+1)*bin_size)]) | |
# bin_err.append(tmp_bin_err) | |
bin_err.append( | |
np.nanmean( | |
err[np.where((radii <= (i+1)*bin_size) & (radii > i*bin_size))[0]] | |
) | |
# np.nanstd(flux[np.where((radii <= (i+1)*bin_size) & (radii > i*bin_size))[0]]) | |
) | |
for i in range(len(bin_err)): | |
bin_err[i] = np.sqrt(bin_err[i]**2 + (flux_scale_err*bin_flux[i])**2) | |
with open(outname, "w+") as f: | |
f.write("#brightness,radius\n") | |
for i in range(len(bin_radii)): | |
f.write("{},{},{}\n".format(bin_radii[i], bin_flux[i], bin_err[i])) | |
def measure_specmap(images, rms, outname): | |
""" | |
""" | |
from scipy.optimize import curve_fit | |
def powerlaw(x, a, b): | |
"""Simple powerlaw function.""" | |
return a*(x**b) | |
def powerlaw_amplitude(y, x, b): | |
return y/(x**b) | |
arrays = [] | |
freqs = [] | |
rmses = [] | |
for i in range(len(images)): | |
farray, _, _, _, _, _, _, _ = read_fits(images[i]) | |
rmses.append(np.squeeze(rms_array(rms[i], farray))) | |
harray = fits.getheader(images[i]) | |
freqs.append(np.full_like(np.squeeze(farray), find_freq(harray))) | |
arrays.append(np.squeeze(farray)) | |
if i == 0: | |
header = harray | |
arrays = np.stack(arrays) | |
freqs = np.stack(arrays) | |
rmses = np.stack(rmses) | |
print(arrays.shape) | |
print(freqs.shape) | |
print(rmses.shape) | |
alpha = np.full_like(arrays[0, ...], np.nan) | |
err_alpha = np.full_like(arrays[0, ...], np.nan) | |
for i in range(len(alpha[:, 0])): | |
for j in range(len(alpha[:, 1])): | |
popt, pcov = curve_fit(powerlaw, freqs[:, i, j], arrays[:, i, j], | |
[powerlaw_amplitude(arrays[0, i, j], freqs[0, i, j], -1), -1.], | |
absolute_sigma=True, | |
method="lm", | |
sigma=rmses[:, i, j], | |
maxfev=10000) | |
perr = np.sqrt(np.diag(pcov)) | |
alpha[i, j] = popt[1] | |
err_alpha[i, j] = perr[1] | |
fits.writeto(outname, alpha, header, overwrite=True) | |
fits.writeto(outname.replace(".fits", "_err.fits"), err_alpha, header, overwrite=True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment