Last active
October 4, 2019 16:16
-
-
Save ipashchenko/fb7ac8ce8a072ba114fb47b464bf063b to your computer and use it in GitHub Desktop.
Find bounding box of source emission.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[1803+784] | |
n_std_lowest_contour = 1 | |
[J1800+78] | |
n_std_lowest_contour = 10 | |
plot = no | |
out_fname = other_name.txt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import numpy as np | |
import pandas as pd | |
import astropy.io.fits as pf | |
import astropy.units as u | |
import matplotlib | |
import matplotlib.pyplot as plt | |
from matplotlib.patches import Ellipse | |
from astropy.stats import mad_std | |
from astropy.wcs import WCS | |
from scipy.ndimage.measurements import label | |
from scipy.ndimage.morphology import generate_binary_structure | |
from skimage.measure import regionprops | |
import argparse | |
import configparser | |
deg_to_mas = u.deg.to(u.mas) | |
label_size = 16 | |
matplotlib.rcParams['xtick.labelsize'] = label_size | |
matplotlib.rcParams['ytick.labelsize'] = label_size | |
matplotlib.rcParams['axes.titlesize'] = label_size | |
matplotlib.rcParams['axes.labelsize'] = label_size | |
matplotlib.rcParams['font.size'] = label_size | |
matplotlib.rcParams['legend.fontsize'] = label_size | |
matplotlib.rcParams["contour.negative_linestyle"] = 'dotted' | |
def find_bbox(array, level, min_maxintensity_mjyperbeam, min_area_pix, | |
delta=0.): | |
""" | |
Find bounding box for part of image containing source. | |
:param array: | |
Numpy 2D array with image. | |
:param level: | |
Level at which threshold image in image units. | |
:param min_maxintensity_mjyperbeam: | |
Minimum of the maximum intensity in the region to include. | |
:param min_area_pix: | |
Minimum area for region to include. | |
:param delta: (optional) | |
Extra space to add symmetrically [pixels]. (default: ``0``) | |
:return: | |
Tuples of BLC & TRC. | |
:note: | |
This is BLC, TRC for numpy array (i.e. transposed source map as it | |
conventionally seen on VLBI maps). | |
""" | |
signal = array > level | |
s = generate_binary_structure(2, 2) | |
labeled_array, num_features = label(signal, structure=s) | |
props = regionprops(labeled_array, intensity_image=array) | |
signal_props = list() | |
for prop in props: | |
if prop.max_intensity > min_maxintensity_mjyperbeam/1000 and prop.area > min_area_pix: | |
signal_props.append(prop) | |
blcs = list() | |
trcs = list() | |
for prop in signal_props: | |
bbox = prop.bbox | |
blc = (int(bbox[1]), int(bbox[0])) | |
trc = (int(bbox[3]), int(bbox[2])) | |
blcs.append(blc) | |
trcs.append(trc) | |
min_blc_0 = min([blc[0] for blc in blcs]) | |
min_blc_1 = min([blc[1] for blc in blcs]) | |
max_trc_0 = max([trc[0] for trc in trcs]) | |
max_trc_1 = max([trc[1] for trc in trcs]) | |
return (min_blc_0-delta, min_blc_1-delta), (max_trc_0+delta, max_trc_1+delta) | |
def quadratize_bbox(blc, trc, image_size): | |
""" | |
Symmetrically enlarge shortest rectangular side and make bounding box | |
quadratic. | |
""" | |
new_blc = [blc[0], blc[1]] | |
new_trc = [trc[0], trc[1]] | |
a = trc[1] - blc[1] | |
b = trc[0] - blc[0] | |
delta = abs(b - a) | |
half_delta = delta // 2 | |
remainder_delta = delta % 2 | |
if a < b: | |
new_trc[1] += half_delta | |
new_blc[1] -= half_delta | |
if remainder_delta: | |
new_blc[1] -= 1 | |
else: | |
new_trc[0] += half_delta | |
new_blc[0] -= half_delta | |
if remainder_delta: | |
new_blc[0] -= 1 | |
new_blc, new_trc = check_bbox(new_blc, new_trc, image_size) | |
return tuple(new_blc), tuple(new_trc) | |
def convert_bbox_to_radec_ranges(wcs, blc, trc): | |
""" | |
Given BLC, TRC in coordinates of numpy array return RA, DEC ranges. | |
:param wcs: | |
Instance of ``astropy.wcs.wcs.WCS``. | |
:return: | |
RA_min, RA_max, DEC_min, DEC_max | |
""" | |
blc_deg = wcs.all_pix2world(blc[0], blc[1], 1) | |
trc_deg = wcs.all_pix2world(trc[0], trc[1], 1) | |
ra_max = blc_deg[0] | |
ra_min = trc_deg[0] | |
dec_min = blc_deg[1] | |
dec_max = trc_deg[1] | |
return ra_min, ra_max, dec_min, dec_max | |
def check_bbox(blc, trc, image_size): | |
""" | |
:note: | |
This can make quadratic image rectangular. | |
""" | |
# If some bottom corner coordinate become negative | |
blc = list(blc) | |
trc = list(trc) | |
if blc[0] < 0: | |
blc[0] = 0 | |
if blc[1] < 0: | |
blc[1] = 0 | |
# If some top corner coordinate become large than image size | |
if trc[0] > image_size: | |
delta = abs(trc[0]-image_size) | |
blc[0] -= delta | |
# Check if shift have not made it negative | |
if blc[0] < 0 and trc[0] > image_size: | |
blc[0] = 0 | |
trc[0] -= delta | |
if trc[1] > image_size: | |
delta = abs(trc[1]-image_size) | |
blc[1] -= delta | |
# Check if shift have not made it negative | |
if blc[1] < 0 and trc[1] > image_size: | |
blc[1] = 0 | |
trc[1] -= delta | |
return tuple(blc), tuple(trc) | |
def run(path_to_icn_fits, n_std_bbox=3, n_std_lowest_contour=3, | |
image_save_fname=None, quadratic_image=True, | |
n_std_min_maxintensity=4.0, min_area_pix=100, | |
beam=None): | |
""" | |
:param path_to_icn_fits: | |
Path to FITS-file with CLEAN image. | |
:param n_std_bbox: (optional) | |
Number of noise std to make bounding box. (default: ``3``) | |
:param n_std_lowest_contour: (optional) | |
Number of noise std for lowest contour. (default: ``3``) | |
:param image_save_fname: (optional) | |
Path to save image. If ``None`` then do not save and only show and | |
return a figure. (default: ``None``) | |
:param quadratic_image: (optional) | |
Boolean. Make image quadratic? (default: ``True``) | |
:param n_std_min_maxintensity: (optional) | |
Minimum of the maximum intensity in the region to include (in number of | |
RMS). (default: ``4.0``) | |
:param min_area_pix: (optional) | |
Minimum area for region to include. (default: ``100``) | |
:beam: (optional) | |
Beam (BMAJ, BMIN, BPA) where BMAJ & BMIN are in mas and BPA in deg. If | |
``None`` then do not plot the beam. (default: ``None``) | |
:return: | |
Dictionary with keys "quadratic_bbox", "rectangular_bbox", "figure", | |
"mad_std", etc. | |
""" | |
icn = pf.getdata(path_to_icn_fits) | |
header = pf.getheader(path_to_icn_fits) | |
name = header["OBJECT"] | |
freq = header["CRVAL3"]/10**9 | |
epoch = header["DATE-OBS"] | |
wcs = WCS(header) | |
# Ignore FREQ, STOKES - only RA, DEC matters here | |
wcs = wcs.celestial | |
# Make offset coordinates | |
wcs.wcs.crval = 0., 0. | |
wcs.wcs.ctype = 'XOFFSET', 'YOFFSET' | |
wcs.wcs.cunit = 'mas', 'mas' | |
wcs.wcs.cdelt = (wcs.wcs.cdelt * u.deg).to(u.mas) | |
# Remove one-sized dimensions (RA, DEC, ...) | |
icn = icn.squeeze() | |
image_size = icn.shape[0] | |
peak = icn.max() | |
# Robustly estimate image pixels std | |
std = mad_std(icn) | |
# Find preliminary bounding box | |
blc, trc = find_bbox(icn, level=n_std_bbox*std, | |
min_maxintensity_mjyperbeam=n_std_min_maxintensity*std, | |
min_area_pix=min_area_pix, | |
delta=0) | |
# Now mask out source emission using found bounding box and estimate std | |
# more accurately | |
mask = np.zeros(icn.shape) | |
mask[blc[1]: trc[1], blc[0]: trc[0]] = 1 | |
outside_icn = np.ma.array(icn, mask=mask) | |
std = mad_std(outside_icn) | |
# Final bounding box estimation | |
blc_rec, trc_rec = find_bbox(icn, level=n_std_bbox*std, | |
min_maxintensity_mjyperbeam=n_std_min_maxintensity*std, | |
min_area_pix=min_area_pix, | |
delta=0) | |
blc_rec_ = blc_rec | |
trc_rec_ = trc_rec | |
blc_rec_, trc_rec_ = check_bbox(blc_rec_, trc_rec_, image_size) | |
# Enlarge 10% each side | |
delta_ra = abs(trc_rec[0]-blc_rec[0]) | |
delta_dec = abs(trc_rec[1]-blc_rec[1]) | |
blc_rec = (blc_rec[0] - int(0.1*delta_ra), blc_rec[1] - int(0.1*delta_dec)) | |
trc_rec = (trc_rec[0] + int(0.1*delta_ra), trc_rec[1] + int(0.1*delta_dec)) | |
blc_rec, trc_rec = check_bbox(blc_rec, trc_rec, image_size) | |
ra_min_rec, ra_max_rec, dec_min_rec, dec_max_rec = convert_bbox_to_radec_ranges(wcs, blc_rec, trc_rec) | |
# Quadratize un-enlarged rectangular bbox | |
blc, trc = quadratize_bbox(blc_rec_, trc_rec_, image_size) | |
# Enlarge 10% each side | |
delta = int(0.1*abs(trc[0]-blc[0])) | |
# There are limits for enlarging | |
if blc[0]-delta < 0: | |
delta = min(delta, blc[0]) | |
if blc[1]-delta < 0: | |
delta = min(delta, blc[1]) | |
delta = min(delta, | |
abs(image_size - trc[0] - delta), | |
abs(image_size - trc[0] - delta)) | |
blc = (blc[0] - delta, blc[1] - delta) | |
trc = (trc[0] + delta, trc[1] + delta) | |
# Now check if resulting map size is less than 10xbeams and enlarge if so. | |
low_limit_map_size = int(5*(bmaj_pix+bmin_pix)) | |
if abs(blc[0]-trc[0]) < low_limit_map_size: | |
delta = int(0.5*(low_limit_map_size - abs(blc[0]-trc[0]))) | |
blc = (blc[0] - delta, blc[1] - delta) | |
trc = (trc[0] + delta, trc[1] + delta) | |
ra_min, ra_max, dec_min, dec_max = convert_bbox_to_radec_ranges(wcs, blc, trc) | |
fig = plt.figure(figsize=(12, 12)) | |
axes = fig.add_axes([0.1, 0.1, 0.9, 0.9], projection=wcs, aspect='equal') | |
axes.coords[0].set_axislabel('Relative Right Ascension (mas)', size='large') | |
axes.coords[1].set_axislabel('Relative Declination (mas)', size='large') | |
lev0 = n_std_lowest_contour*std | |
levels = lev0 * np.logspace(0, 30, num=31, base=np.sqrt(2)) | |
levels = np.hstack(([-lev0], levels)) | |
levels = levels[levels < icn.max()] | |
# Should we plot quadratic or original rectangular image? | |
if quadratic_image: | |
blc_ = blc | |
trc_ = trc | |
else: | |
blc_ = blc_rec | |
trc_ = trc_rec | |
axes.contour(icn[blc_[1]: trc_[1], blc_[0]: trc_[0]], levels=levels, | |
extent=[blc_[0], trc_[0], blc_[1], trc_[1]], colors='k', | |
linewidths=0.75) | |
if beam is not None: | |
# Beam params in mas | |
bmaj = beam[0] | |
bmin = beam[1] | |
bpa = beam[2] | |
xmin, xmax = axes.get_xlim() | |
ymin, ymax = axes.get_ylim() | |
ell = Ellipse((xmin-0.6*bmaj/wcs.wcs.cdelt[0], ymin+0.6*bmaj/wcs.wcs.cdelt[1]), | |
bmaj/wcs.wcs.cdelt[1], bmin/wcs.wcs.cdelt[1], | |
angle=-(90-bpa), ec='k', fc='green', alpha=0.5, fill=True) | |
axes.add_patch(ell) | |
axes.set_title('{}, {}, {:.2f} GHz\n' | |
'Peak: {}, RMS: {}, from:' | |
' {} mJy/beam\n' | |
'Beam: {} x {} at {} deg.'.format(name, epoch, freq, | |
format(1e3*peak, '.1f'), | |
format(1e3*std, '.2f'), | |
format(1e3*levels[1], '.2f'), | |
format(bmaj, '.2f'), | |
format(bmin, '.2f'), | |
format(bpa, '.1f')), | |
size='x-large', loc="left", pad=10) | |
if image_save_fname is not None: | |
fig.savefig(image_save_fname, bbox_inches="tight") | |
plt.close() | |
return {"quadratic_bbox": (blc, trc), | |
"rectangular_bbox": (blc_rec, trc_rec), | |
"figure": fig, | |
"mad_std": std, | |
"RA_min": ra_min, "RA_max": ra_max, | |
"DEC_min": dec_min, "DEC_max": dec_max, | |
"RA_min_rec": ra_min_rec, "RA_max_rec": ra_max_rec, | |
"DEC_min_rec": dec_min_rec, "DEC_max_rec": dec_max_rec} | |
if __name__ == "__main__": | |
parser = \ | |
argparse.ArgumentParser(description='Estimate image bounded boxes.') | |
parser.add_argument('-fits_dir', action='store', nargs='?', type=str, | |
metavar='DIRECTORY WITH FITS FILES', default=None, | |
help='Directory containing FITS-files with CLEAN images.') | |
parser.add_argument('-extra_params_file', action='store', nargs='?', type=str, | |
metavar='PATH TO FILE WITH PARAMETERS FOR INDIVIDUAL SOURCE', default=None, | |
help='Path to ini-format file with parameters for individual sources' | |
' (optional). For such sources parameters described in this file' | |
' will override the default ones. Name of the source should' | |
' coincide with those from FITS header. Assuming that source names' | |
' are sections and options are properties in INI file.' | |
' See https://en.wikipedia.org/wiki/INI_file for details.') | |
parser.add_argument('-n_std_min_maxintensity', action='store', | |
nargs='?', default=4.0, | |
type=float, help='Minimum of the maximum intensity in' | |
' the region to include in RNS units.' | |
' Default: 4.0', | |
metavar='POSITIVE FLOAT') | |
parser.add_argument('-min_area_beams', action='store', | |
nargs='?', default=2.0, | |
type=float, help='Minimum area (number of beams) for' | |
' region to include. Default: 2.0', | |
metavar='POSITIVE FLOAT') | |
parser.add_argument('-n_std_bbox', action='store', nargs='?', default=4.0, | |
type=float, help='Number of noise std to make bounding' | |
' box. Default: 4.0', | |
metavar='POSITIVE FLOAT') | |
parser.add_argument('-n_std_lowest_contour', action='store', nargs='?', | |
default=4.0, | |
type=float, help='Number of noise std for lowest' | |
' contour. Default: 4.0', | |
metavar='POSITIVE FLOAT') | |
parser.add_argument('-rectangular', action='store_true', | |
dest='rectangular', | |
default=False, | |
help='Make image rectangular? Default: Make quadratic.') | |
parser.add_argument('-plot', action='store_true', | |
dest='plot', | |
default=False, | |
help='Make and save image? Default: Just create a table.') | |
parser.add_argument('-out_fname', action='store', nargs='?', type=str, | |
metavar='NAME OF THE OUTPUT TABLE', default=None, | |
help='Name of table (w/o extension) with boxes.' | |
' Default: ``result``') | |
args = parser.parse_args() | |
fits_dir = args.fits_dir | |
if fits_dir is None: | |
fits_dir = os.getcwd() | |
fits_files = glob.glob(os.path.join(fits_dir, "*.fits*")) | |
extra_params_file = args.extra_params_file | |
n_std_min_maxintensity = args.n_std_min_maxintensity | |
min_area_beams = args.min_area_beams | |
n_std_bbox = args.n_std_bbox | |
n_std_lowest_contour = args.n_std_lowest_contour | |
rectangular = args.rectangular | |
plot = args.plot | |
out_fname = args.out_fname | |
if out_fname is None: | |
out_fname = "result.txt" | |
else: | |
out_fname += ".txt" | |
default_options_dict = {"n_std_min_maxintensity": n_std_min_maxintensity, | |
"min_area_beams": min_area_beams, | |
"n_std_bbox": n_std_bbox, | |
"n_std_lowest_contour": n_std_lowest_contour, | |
"rectangular": rectangular, | |
"out_fname": out_fname, | |
"plot": plot} | |
df = pd.DataFrame(columns=["source", "epoch", "freq_ghz", "std", | |
"RA_min", "RA_max", "DEC_min", "DEC_max", | |
"RA_min_rec", "RA_max_rec", "DEC_min_rec", "DEC_max_rec"]) | |
# Read additional parameter file for some sources (if exists) | |
if extra_params_file is not None: | |
config = configparser.ConfigParser() | |
config.read(extra_params_file) | |
special_sources = config.sections() | |
if not special_sources: | |
raise Exception("Passed extra_params_file {} with no sources") | |
else: | |
special_sources = list() | |
for fits_file in fits_files: | |
# Create local (current source) dictionary of the parameters | |
local_options_dict = default_options_dict.copy() | |
header = pf.getheader(fits_file) | |
name = header["OBJECT"] | |
freq = header["CRVAL3"]/10**9 | |
epoch = header["DATE-OBS"] | |
print("Processing source {}, epoch {}, frequency {:.1f} GHz".format(name, epoch, freq)) | |
# Assuming quadratic pixel | |
bmaj_pix = header["BMAJ"]/abs(header["CDELT1"]) | |
bmin_pix = header["BMIN"]/abs(header["CDELT1"]) | |
# Square of ellipse | |
beam_pixels = np.pi*bmaj_pix*bmin_pix | |
# Beam parameters in mas and deg | |
beam_mas = (header["BMAJ"]*deg_to_mas, header["BMIN"]*deg_to_mas, | |
header["BPA"]) | |
# Check if source comes with its own parameters from extra_params_file and override default | |
# dictionary of the parameters | |
if name in special_sources: | |
for option in config[name]: | |
print(option) | |
if option in ("plot", "rectangular"): | |
local_options_dict[option] = config.getboolean(name, option) | |
elif option in ("n_std_min_maxintensity", "min_area_beams", "n_std_bbox", | |
"n_std_lowest_contour"): | |
local_options_dict[option] = config.getfloat(name, option) | |
elif option in ("out_fname"): | |
local_options_dict[option] = config[name][option] | |
else: | |
raise Exception("Unknown option {} in file {} for source {}".format(option, | |
extra_params_file, | |
name)) | |
print(local_options_dict) | |
if local_options_dict["plot"]: | |
image_save_fname = "{}_{}.png".format(name, epoch.replace('-', '_')) | |
else: | |
image_save_fname = None | |
result = run(fits_file, | |
n_std_min_maxintensity=local_options_dict["n_std_min_maxintensity"], | |
min_area_pix=local_options_dict["min_area_beams"]*beam_pixels, | |
n_std_bbox=local_options_dict["n_std_bbox"], | |
n_std_lowest_contour=local_options_dict["n_std_lowest_contour"], | |
image_save_fname=image_save_fname, | |
quadratic_image=~local_options_dict["rectangular"], | |
beam=beam_mas) | |
blc, trc = result["quadratic_bbox"] | |
blc_rec, trc_rec = result["rectangular_bbox"] | |
std = result["mad_std"] | |
df_ = pd.Series({"source": name, "epoch": epoch, | |
"freq_ghz": format(freq, '.1f'), | |
"std": format(1e3*std, '.3f'), | |
"RA_min": format(result["RA_min"], '.2f'), | |
"RA_max": format(result["RA_max"], '.2f'), | |
"DEC_min": format(result["DEC_min"], '.2f'), | |
"DEC_max": format(result["DEC_max"], '.2f'), | |
"RA_min_rec": format(result["RA_min_rec"], '.2f'), | |
"RA_max_rec": format(result["RA_max_rec"], '.2f'), | |
"DEC_min_rec": format(result["DEC_min_rec"], '.2f'), | |
"DEC_max_rec": format(result["DEC_max_rec"], '.2f')}) | |
if local_options_dict["out_fname"] == default_options_dict["out_fname"]: | |
df = df.append(df_, ignore_index=True) | |
else: | |
df_local = pd.DataFrame(columns=["source", "epoch", "freq_ghz", "std", | |
"RA_min", "RA_max", "DEC_min", "DEC_max", | |
"RA_min_rec", "RA_max_rec", "DEC_min_rec", "DEC_max_rec"]) | |
df_local = df_local.append(df_, ignore_index=True) | |
df_local.to_csv(local_options_dict["out_fname"], sep=" ", index=False, header=True) | |
df.to_csv(default_options_dict["out_fname"], sep=" ", index=False, header=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment