Skip to content

Instantly share code, notes, and snippets.

@ipashchenko
Last active October 4, 2019 16:16
Show Gist options
  • Save ipashchenko/fb7ac8ce8a072ba114fb47b464bf063b to your computer and use it in GitHub Desktop.
Save ipashchenko/fb7ac8ce8a072ba114fb47b464bf063b to your computer and use it in GitHub Desktop.
Find bounding box of source emission.
[1803+784]
n_std_lowest_contour = 1
[J1800+78]
n_std_lowest_contour = 10
plot = no
out_fname = other_name.txt
import os
import glob
import numpy as np
import pandas as pd
import astropy.io.fits as pf
import astropy.units as u
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from astropy.stats import mad_std
from astropy.wcs import WCS
from scipy.ndimage.measurements import label
from scipy.ndimage.morphology import generate_binary_structure
from skimage.measure import regionprops
import argparse
import configparser
deg_to_mas = u.deg.to(u.mas)
label_size = 16
matplotlib.rcParams['xtick.labelsize'] = label_size
matplotlib.rcParams['ytick.labelsize'] = label_size
matplotlib.rcParams['axes.titlesize'] = label_size
matplotlib.rcParams['axes.labelsize'] = label_size
matplotlib.rcParams['font.size'] = label_size
matplotlib.rcParams['legend.fontsize'] = label_size
matplotlib.rcParams["contour.negative_linestyle"] = 'dotted'
def find_bbox(array, level, min_maxintensity_mjyperbeam, min_area_pix,
delta=0.):
"""
Find bounding box for part of image containing source.
:param array:
Numpy 2D array with image.
:param level:
Level at which threshold image in image units.
:param min_maxintensity_mjyperbeam:
Minimum of the maximum intensity in the region to include.
:param min_area_pix:
Minimum area for region to include.
:param delta: (optional)
Extra space to add symmetrically [pixels]. (default: ``0``)
:return:
Tuples of BLC & TRC.
:note:
This is BLC, TRC for numpy array (i.e. transposed source map as it
conventionally seen on VLBI maps).
"""
signal = array > level
s = generate_binary_structure(2, 2)
labeled_array, num_features = label(signal, structure=s)
props = regionprops(labeled_array, intensity_image=array)
signal_props = list()
for prop in props:
if prop.max_intensity > min_maxintensity_mjyperbeam/1000 and prop.area > min_area_pix:
signal_props.append(prop)
blcs = list()
trcs = list()
for prop in signal_props:
bbox = prop.bbox
blc = (int(bbox[1]), int(bbox[0]))
trc = (int(bbox[3]), int(bbox[2]))
blcs.append(blc)
trcs.append(trc)
min_blc_0 = min([blc[0] for blc in blcs])
min_blc_1 = min([blc[1] for blc in blcs])
max_trc_0 = max([trc[0] for trc in trcs])
max_trc_1 = max([trc[1] for trc in trcs])
return (min_blc_0-delta, min_blc_1-delta), (max_trc_0+delta, max_trc_1+delta)
def quadratize_bbox(blc, trc, image_size):
"""
Symmetrically enlarge shortest rectangular side and make bounding box
quadratic.
"""
new_blc = [blc[0], blc[1]]
new_trc = [trc[0], trc[1]]
a = trc[1] - blc[1]
b = trc[0] - blc[0]
delta = abs(b - a)
half_delta = delta // 2
remainder_delta = delta % 2
if a < b:
new_trc[1] += half_delta
new_blc[1] -= half_delta
if remainder_delta:
new_blc[1] -= 1
else:
new_trc[0] += half_delta
new_blc[0] -= half_delta
if remainder_delta:
new_blc[0] -= 1
new_blc, new_trc = check_bbox(new_blc, new_trc, image_size)
return tuple(new_blc), tuple(new_trc)
def convert_bbox_to_radec_ranges(wcs, blc, trc):
"""
Given BLC, TRC in coordinates of numpy array return RA, DEC ranges.
:param wcs:
Instance of ``astropy.wcs.wcs.WCS``.
:return:
RA_min, RA_max, DEC_min, DEC_max
"""
blc_deg = wcs.all_pix2world(blc[0], blc[1], 1)
trc_deg = wcs.all_pix2world(trc[0], trc[1], 1)
ra_max = blc_deg[0]
ra_min = trc_deg[0]
dec_min = blc_deg[1]
dec_max = trc_deg[1]
return ra_min, ra_max, dec_min, dec_max
def check_bbox(blc, trc, image_size):
"""
:note:
This can make quadratic image rectangular.
"""
# If some bottom corner coordinate become negative
blc = list(blc)
trc = list(trc)
if blc[0] < 0:
blc[0] = 0
if blc[1] < 0:
blc[1] = 0
# If some top corner coordinate become large than image size
if trc[0] > image_size:
delta = abs(trc[0]-image_size)
blc[0] -= delta
# Check if shift have not made it negative
if blc[0] < 0 and trc[0] > image_size:
blc[0] = 0
trc[0] -= delta
if trc[1] > image_size:
delta = abs(trc[1]-image_size)
blc[1] -= delta
# Check if shift have not made it negative
if blc[1] < 0 and trc[1] > image_size:
blc[1] = 0
trc[1] -= delta
return tuple(blc), tuple(trc)
def run(path_to_icn_fits, n_std_bbox=3, n_std_lowest_contour=3,
image_save_fname=None, quadratic_image=True,
n_std_min_maxintensity=4.0, min_area_pix=100,
beam=None):
"""
:param path_to_icn_fits:
Path to FITS-file with CLEAN image.
:param n_std_bbox: (optional)
Number of noise std to make bounding box. (default: ``3``)
:param n_std_lowest_contour: (optional)
Number of noise std for lowest contour. (default: ``3``)
:param image_save_fname: (optional)
Path to save image. If ``None`` then do not save and only show and
return a figure. (default: ``None``)
:param quadratic_image: (optional)
Boolean. Make image quadratic? (default: ``True``)
:param n_std_min_maxintensity: (optional)
Minimum of the maximum intensity in the region to include (in number of
RMS). (default: ``4.0``)
:param min_area_pix: (optional)
Minimum area for region to include. (default: ``100``)
:beam: (optional)
Beam (BMAJ, BMIN, BPA) where BMAJ & BMIN are in mas and BPA in deg. If
``None`` then do not plot the beam. (default: ``None``)
:return:
Dictionary with keys "quadratic_bbox", "rectangular_bbox", "figure",
"mad_std", etc.
"""
icn = pf.getdata(path_to_icn_fits)
header = pf.getheader(path_to_icn_fits)
name = header["OBJECT"]
freq = header["CRVAL3"]/10**9
epoch = header["DATE-OBS"]
wcs = WCS(header)
# Ignore FREQ, STOKES - only RA, DEC matters here
wcs = wcs.celestial
# Make offset coordinates
wcs.wcs.crval = 0., 0.
wcs.wcs.ctype = 'XOFFSET', 'YOFFSET'
wcs.wcs.cunit = 'mas', 'mas'
wcs.wcs.cdelt = (wcs.wcs.cdelt * u.deg).to(u.mas)
# Remove one-sized dimensions (RA, DEC, ...)
icn = icn.squeeze()
image_size = icn.shape[0]
peak = icn.max()
# Robustly estimate image pixels std
std = mad_std(icn)
# Find preliminary bounding box
blc, trc = find_bbox(icn, level=n_std_bbox*std,
min_maxintensity_mjyperbeam=n_std_min_maxintensity*std,
min_area_pix=min_area_pix,
delta=0)
# Now mask out source emission using found bounding box and estimate std
# more accurately
mask = np.zeros(icn.shape)
mask[blc[1]: trc[1], blc[0]: trc[0]] = 1
outside_icn = np.ma.array(icn, mask=mask)
std = mad_std(outside_icn)
# Final bounding box estimation
blc_rec, trc_rec = find_bbox(icn, level=n_std_bbox*std,
min_maxintensity_mjyperbeam=n_std_min_maxintensity*std,
min_area_pix=min_area_pix,
delta=0)
blc_rec_ = blc_rec
trc_rec_ = trc_rec
blc_rec_, trc_rec_ = check_bbox(blc_rec_, trc_rec_, image_size)
# Enlarge 10% each side
delta_ra = abs(trc_rec[0]-blc_rec[0])
delta_dec = abs(trc_rec[1]-blc_rec[1])
blc_rec = (blc_rec[0] - int(0.1*delta_ra), blc_rec[1] - int(0.1*delta_dec))
trc_rec = (trc_rec[0] + int(0.1*delta_ra), trc_rec[1] + int(0.1*delta_dec))
blc_rec, trc_rec = check_bbox(blc_rec, trc_rec, image_size)
ra_min_rec, ra_max_rec, dec_min_rec, dec_max_rec = convert_bbox_to_radec_ranges(wcs, blc_rec, trc_rec)
# Quadratize un-enlarged rectangular bbox
blc, trc = quadratize_bbox(blc_rec_, trc_rec_, image_size)
# Enlarge 10% each side
delta = int(0.1*abs(trc[0]-blc[0]))
# There are limits for enlarging
if blc[0]-delta < 0:
delta = min(delta, blc[0])
if blc[1]-delta < 0:
delta = min(delta, blc[1])
delta = min(delta,
abs(image_size - trc[0] - delta),
abs(image_size - trc[0] - delta))
blc = (blc[0] - delta, blc[1] - delta)
trc = (trc[0] + delta, trc[1] + delta)
# Now check if resulting map size is less than 10xbeams and enlarge if so.
low_limit_map_size = int(5*(bmaj_pix+bmin_pix))
if abs(blc[0]-trc[0]) < low_limit_map_size:
delta = int(0.5*(low_limit_map_size - abs(blc[0]-trc[0])))
blc = (blc[0] - delta, blc[1] - delta)
trc = (trc[0] + delta, trc[1] + delta)
ra_min, ra_max, dec_min, dec_max = convert_bbox_to_radec_ranges(wcs, blc, trc)
fig = plt.figure(figsize=(12, 12))
axes = fig.add_axes([0.1, 0.1, 0.9, 0.9], projection=wcs, aspect='equal')
axes.coords[0].set_axislabel('Relative Right Ascension (mas)', size='large')
axes.coords[1].set_axislabel('Relative Declination (mas)', size='large')
lev0 = n_std_lowest_contour*std
levels = lev0 * np.logspace(0, 30, num=31, base=np.sqrt(2))
levels = np.hstack(([-lev0], levels))
levels = levels[levels < icn.max()]
# Should we plot quadratic or original rectangular image?
if quadratic_image:
blc_ = blc
trc_ = trc
else:
blc_ = blc_rec
trc_ = trc_rec
axes.contour(icn[blc_[1]: trc_[1], blc_[0]: trc_[0]], levels=levels,
extent=[blc_[0], trc_[0], blc_[1], trc_[1]], colors='k',
linewidths=0.75)
if beam is not None:
# Beam params in mas
bmaj = beam[0]
bmin = beam[1]
bpa = beam[2]
xmin, xmax = axes.get_xlim()
ymin, ymax = axes.get_ylim()
ell = Ellipse((xmin-0.6*bmaj/wcs.wcs.cdelt[0], ymin+0.6*bmaj/wcs.wcs.cdelt[1]),
bmaj/wcs.wcs.cdelt[1], bmin/wcs.wcs.cdelt[1],
angle=-(90-bpa), ec='k', fc='green', alpha=0.5, fill=True)
axes.add_patch(ell)
axes.set_title('{}, {}, {:.2f} GHz\n'
'Peak: {}, RMS: {}, from:'
' {} mJy/beam\n'
'Beam: {} x {} at {} deg.'.format(name, epoch, freq,
format(1e3*peak, '.1f'),
format(1e3*std, '.2f'),
format(1e3*levels[1], '.2f'),
format(bmaj, '.2f'),
format(bmin, '.2f'),
format(bpa, '.1f')),
size='x-large', loc="left", pad=10)
if image_save_fname is not None:
fig.savefig(image_save_fname, bbox_inches="tight")
plt.close()
return {"quadratic_bbox": (blc, trc),
"rectangular_bbox": (blc_rec, trc_rec),
"figure": fig,
"mad_std": std,
"RA_min": ra_min, "RA_max": ra_max,
"DEC_min": dec_min, "DEC_max": dec_max,
"RA_min_rec": ra_min_rec, "RA_max_rec": ra_max_rec,
"DEC_min_rec": dec_min_rec, "DEC_max_rec": dec_max_rec}
if __name__ == "__main__":
parser = \
argparse.ArgumentParser(description='Estimate image bounded boxes.')
parser.add_argument('-fits_dir', action='store', nargs='?', type=str,
metavar='DIRECTORY WITH FITS FILES', default=None,
help='Directory containing FITS-files with CLEAN images.')
parser.add_argument('-extra_params_file', action='store', nargs='?', type=str,
metavar='PATH TO FILE WITH PARAMETERS FOR INDIVIDUAL SOURCE', default=None,
help='Path to ini-format file with parameters for individual sources'
' (optional). For such sources parameters described in this file'
' will override the default ones. Name of the source should'
' coincide with those from FITS header. Assuming that source names'
' are sections and options are properties in INI file.'
' See https://en.wikipedia.org/wiki/INI_file for details.')
parser.add_argument('-n_std_min_maxintensity', action='store',
nargs='?', default=4.0,
type=float, help='Minimum of the maximum intensity in'
' the region to include in RNS units.'
' Default: 4.0',
metavar='POSITIVE FLOAT')
parser.add_argument('-min_area_beams', action='store',
nargs='?', default=2.0,
type=float, help='Minimum area (number of beams) for'
' region to include. Default: 2.0',
metavar='POSITIVE FLOAT')
parser.add_argument('-n_std_bbox', action='store', nargs='?', default=4.0,
type=float, help='Number of noise std to make bounding'
' box. Default: 4.0',
metavar='POSITIVE FLOAT')
parser.add_argument('-n_std_lowest_contour', action='store', nargs='?',
default=4.0,
type=float, help='Number of noise std for lowest'
' contour. Default: 4.0',
metavar='POSITIVE FLOAT')
parser.add_argument('-rectangular', action='store_true',
dest='rectangular',
default=False,
help='Make image rectangular? Default: Make quadratic.')
parser.add_argument('-plot', action='store_true',
dest='plot',
default=False,
help='Make and save image? Default: Just create a table.')
parser.add_argument('-out_fname', action='store', nargs='?', type=str,
metavar='NAME OF THE OUTPUT TABLE', default=None,
help='Name of table (w/o extension) with boxes.'
' Default: ``result``')
args = parser.parse_args()
fits_dir = args.fits_dir
if fits_dir is None:
fits_dir = os.getcwd()
fits_files = glob.glob(os.path.join(fits_dir, "*.fits*"))
extra_params_file = args.extra_params_file
n_std_min_maxintensity = args.n_std_min_maxintensity
min_area_beams = args.min_area_beams
n_std_bbox = args.n_std_bbox
n_std_lowest_contour = args.n_std_lowest_contour
rectangular = args.rectangular
plot = args.plot
out_fname = args.out_fname
if out_fname is None:
out_fname = "result.txt"
else:
out_fname += ".txt"
default_options_dict = {"n_std_min_maxintensity": n_std_min_maxintensity,
"min_area_beams": min_area_beams,
"n_std_bbox": n_std_bbox,
"n_std_lowest_contour": n_std_lowest_contour,
"rectangular": rectangular,
"out_fname": out_fname,
"plot": plot}
df = pd.DataFrame(columns=["source", "epoch", "freq_ghz", "std",
"RA_min", "RA_max", "DEC_min", "DEC_max",
"RA_min_rec", "RA_max_rec", "DEC_min_rec", "DEC_max_rec"])
# Read additional parameter file for some sources (if exists)
if extra_params_file is not None:
config = configparser.ConfigParser()
config.read(extra_params_file)
special_sources = config.sections()
if not special_sources:
raise Exception("Passed extra_params_file {} with no sources")
else:
special_sources = list()
for fits_file in fits_files:
# Create local (current source) dictionary of the parameters
local_options_dict = default_options_dict.copy()
header = pf.getheader(fits_file)
name = header["OBJECT"]
freq = header["CRVAL3"]/10**9
epoch = header["DATE-OBS"]
print("Processing source {}, epoch {}, frequency {:.1f} GHz".format(name, epoch, freq))
# Assuming quadratic pixel
bmaj_pix = header["BMAJ"]/abs(header["CDELT1"])
bmin_pix = header["BMIN"]/abs(header["CDELT1"])
# Square of ellipse
beam_pixels = np.pi*bmaj_pix*bmin_pix
# Beam parameters in mas and deg
beam_mas = (header["BMAJ"]*deg_to_mas, header["BMIN"]*deg_to_mas,
header["BPA"])
# Check if source comes with its own parameters from extra_params_file and override default
# dictionary of the parameters
if name in special_sources:
for option in config[name]:
print(option)
if option in ("plot", "rectangular"):
local_options_dict[option] = config.getboolean(name, option)
elif option in ("n_std_min_maxintensity", "min_area_beams", "n_std_bbox",
"n_std_lowest_contour"):
local_options_dict[option] = config.getfloat(name, option)
elif option in ("out_fname"):
local_options_dict[option] = config[name][option]
else:
raise Exception("Unknown option {} in file {} for source {}".format(option,
extra_params_file,
name))
print(local_options_dict)
if local_options_dict["plot"]:
image_save_fname = "{}_{}.png".format(name, epoch.replace('-', '_'))
else:
image_save_fname = None
result = run(fits_file,
n_std_min_maxintensity=local_options_dict["n_std_min_maxintensity"],
min_area_pix=local_options_dict["min_area_beams"]*beam_pixels,
n_std_bbox=local_options_dict["n_std_bbox"],
n_std_lowest_contour=local_options_dict["n_std_lowest_contour"],
image_save_fname=image_save_fname,
quadratic_image=~local_options_dict["rectangular"],
beam=beam_mas)
blc, trc = result["quadratic_bbox"]
blc_rec, trc_rec = result["rectangular_bbox"]
std = result["mad_std"]
df_ = pd.Series({"source": name, "epoch": epoch,
"freq_ghz": format(freq, '.1f'),
"std": format(1e3*std, '.3f'),
"RA_min": format(result["RA_min"], '.2f'),
"RA_max": format(result["RA_max"], '.2f'),
"DEC_min": format(result["DEC_min"], '.2f'),
"DEC_max": format(result["DEC_max"], '.2f'),
"RA_min_rec": format(result["RA_min_rec"], '.2f'),
"RA_max_rec": format(result["RA_max_rec"], '.2f'),
"DEC_min_rec": format(result["DEC_min_rec"], '.2f'),
"DEC_max_rec": format(result["DEC_max_rec"], '.2f')})
if local_options_dict["out_fname"] == default_options_dict["out_fname"]:
df = df.append(df_, ignore_index=True)
else:
df_local = pd.DataFrame(columns=["source", "epoch", "freq_ghz", "std",
"RA_min", "RA_max", "DEC_min", "DEC_max",
"RA_min_rec", "RA_max_rec", "DEC_min_rec", "DEC_max_rec"])
df_local = df_local.append(df_, ignore_index=True)
df_local.to_csv(local_options_dict["out_fname"], sep=" ", index=False, header=True)
df.to_csv(default_options_dict["out_fname"], sep=" ", index=False, header=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment