Skip to content

Instantly share code, notes, and snippets.

@keflavich
Created March 19, 2022 18:56
Show Gist options
  • Save keflavich/165b10ce09b68fb3deda50df85cf2ce8 to your computer and use it in GitHub Desktop.
Save keflavich/165b10ce09b68fb3deda50df85cf2ce8 to your computer and use it in GitHub Desktop.
Old code to make an ALMA finder chart
"""
Utilities for making finder charts and overlay images for ALMA proposing
"""
import string
import os
import numpy as np
from astropy import wcs
from astroquery import log
from astropy import units as u
from astropy.io import fits
from astroquery.utils.commons import ASTROPY_LT_4_1
from astroquery.skyview import SkyView
from astroquery.alma import Alma
def pyregion_subset(region, data, mywcs):
"""
Return a subset of an image (``data``) given a region.
Parameters
----------
region : `~pyregion.Shape`
A Shape from a pyregion-parsed region file
data : np.ndarray
An array with shape described by WCS
mywcs : `astropy.wcs.WCS`
A world coordinate system describing the data
"""
import pyregion
shapelist = pyregion.ShapeList([region])
if shapelist[0].coord_format not in ('physical', 'image'):
celhdr = mywcs.sub([wcs.WCSSUB_CELESTIAL]).to_header()
pixel_regions = shapelist.as_imagecoord(celhdr)
else:
# For this to work, we'd need to change the reference pixel after
# cropping. Alternatively, we can just make the full-sized
# mask... todo....
raise NotImplementedError("Can't use non-celestial coordinates "
"with regions.")
pixel_regions = shapelist
# This is a hack to use mpl to determine the outer bounds of the regions
# (but it's a legit hack - pyregion needs a major internal refactor
# before we can approach this any other way, I think -AG)
mpl_objs = pixel_regions.get_mpl_patches_texts()[0]
# Find the minimal enclosing box containing all of the regions
# (this will speed up the mask creation below)
extent = mpl_objs[0].get_extents()
xlo, ylo = extent.min
xhi, yhi = extent.max
all_extents = [obj.get_extents() for obj in mpl_objs]
for ext in all_extents:
xlo = int(np.round(xlo if xlo < ext.min[0] else ext.min[0]))
ylo = int(np.round(ylo if ylo < ext.min[1] else ext.min[1]))
xhi = int(np.round(xhi if xhi > ext.max[0] else ext.max[0]))
yhi = int(np.round(yhi if yhi > ext.max[1] else ext.max[1]))
log.debug("Region boundaries: ")
log.debug("xlo={xlo}, ylo={ylo}, xhi={xhi}, yhi={yhi}".format(xlo=xlo,
ylo=ylo,
xhi=xhi,
yhi=yhi))
subwcs = mywcs[int(ylo):int(yhi), int(xlo):int(xhi)]
subhdr = subwcs.sub([wcs.WCSSUB_CELESTIAL]).to_header()
subdata = data[int(ylo):int(yhi), int(xlo):int(xhi)]
mask = shapelist.get_mask(header=subhdr,
shape=subdata.shape)
log.debug("Shapes: data={0}, subdata={2}, mask={1}"
.format(data.shape, mask.shape, subdata.shape))
return (xlo, xhi, ylo, yhi), mask
def parse_frequency_support(frequency_support_str):
"""
ALMA "Frequency Support" strings have the form:
[100.63..101.57GHz,488.28kHz, XX YY] U
[102.43..103.37GHz,488.28kHz, XX YY] U
[112.74..113.68GHz,488.28kHz, XX YY] U
[114.45..115.38GHz,488.28kHz, XX YY]
at least, as far as we have seen. The "U" is meant to be the Union symbol.
This function will parse such a string into a list of pairs of astropy
Quantities representing the frequency range. It will ignore the resolution
and polarizations.
"""
if not isinstance(frequency_support_str, str):
supports = frequency_support_str.tostring().decode('ascii').split('U')
else:
supports = frequency_support_str.split('U')
freq_ranges = [(float(sup[0]),
float(sup[1].split(',')[0].strip(string.ascii_letters))) *
u.Unit(sup[1].split(',')[0].strip(string.punctuation +
string.digits))
for i in supports for sup in [i.strip('[] ').split('..'), ]]
return u.Quantity(freq_ranges)
def footprint_to_reg(footprint):
"""
ALMA footprints have the form:
'Polygon ICRS 266.519781 -28.724666 266.524678 -28.731930 266.536683
-28.737784 266.543860 -28.737586 266.549277 -28.733370 266.558133
-28.729545 266.560136 -28.724666 266.558845 -28.719605 266.560133
-28.694332 266.555234 -28.687069 266.543232 -28.681216 266.536058
-28.681414 266.530644 -28.685630 266.521788 -28.689453 266.519784
-28.694332 266.521332 -28.699778'
Some of them have *additional* polygons
"""
if ASTROPY_LT_4_1:
footprint = footprint.decode('utf-8')
if footprint[:7] != 'Polygon' and footprint[:6] != 'Circle':
raise ValueError("Unrecognized footprint type")
from pyregion.parser_helper import Shape
reglist = []
entries = footprint.split()
if entries[0] == 'Circle':
reg = Shape('circle', [float(x) for x in entries[1:] if x != 'ICRS'])
reg.coord_format = 'icrs'
reg.coord_list = reg.params # the coord_list attribute is needed somewhere
reg.attr = ([], {'color': 'green', 'dash': '0 ', 'dashlist': '8 3',
'delete': '1 ', 'edit': '1 ', 'fixed': '0 ', 'font':
'"helvetica 10 normal roman"', 'highlite': '1 ',
'include': '1 ', 'move': '1 ', 'select': '1',
'source': '1', 'text': '', 'width': '1 '})
reglist.append(reg)
else:
polygons = [index for index, entry in enumerate(entries) if entry == 'Polygon']
for start, stop in zip(polygons, polygons[1:]+[None]):
reg = Shape('polygon', [float(x) for x in entries[start+1:stop] if x != 'ICRS'])
reg.coord_format = 'icrs'
reg.coord_list = reg.params # the coord_list attribute is needed somewhere
reg.attr = ([], {'color': 'green', 'dash': '0 ', 'dashlist': '8 3',
'delete': '1 ', 'edit': '1 ', 'fixed': '0 ', 'font':
'"helvetica 10 normal roman"', 'highlite': '1 ',
'include': '1 ', 'move': '1 ', 'select': '1',
'source': '1', 'text': '', 'width': '1 '})
reglist.append(reg)
return reglist
def add_meta_to_reg(reg, meta):
reg.meta = meta
return reg
def approximate_primary_beam_sizes(frequency_support_str,
dish_diameter=12 * u.m, first_null=1.220):
r"""
Using parse_frequency_support, determine the mean primary beam size in each
observed band
Parameters
----------
frequency_support_str : str
The frequency support string, see `parse_frequency_support`
dish_diameter : `~astropy.units.Quantity`
Meter-equivalent unit. The diameter of the dish.
first_null : float
The position of the first null of an Airy. Used to compute resolution
as :math:`R = 1.22 \lambda/D`
"""
freq_ranges = parse_frequency_support(frequency_support_str)
beam_sizes = [(first_null * fr.mean().to(u.m, u.spectral()) /
(dish_diameter)).to(u.arcsec, u.dimensionless_angles())
for fr in freq_ranges]
return u.Quantity(beam_sizes)
def make_finder_chart(target, radius, save_prefix, service=SkyView.get_images,
service_kwargs={'survey': ['2MASS-K'], 'pixels': 500},
alma_kwargs={'public': False, 'science': False},
**kwargs):
"""
Create a "finder chart" showing where ALMA has pointed in various bands,
including different color coding for public/private data and each band.
Contours are set at various integration times.
Parameters
----------
target : `astropy.coordinates` or str
A legitimate target name
radius : `~astropy.units.Quantity`
A degree-equivalent radius.
save_prefix : str
The prefix for the output files. Both .reg and .png files will be
written. The .reg files will have the band numbers and
public/private appended, while the .png file will be named
prefix_almafinderchart.png
service : function
The ``get_images`` function of an astroquery service, e.g. SkyView.
service_kwargs : dict
The keyword arguments to pass to the specified service. For example,
for SkyView, you can give it the survey ID (e.g., 2MASS-K) and the
number of pixels in the resulting image. See the documentation for the
individual services for more details.
alma_kwargs : dict
Keywords to pass to the ALMA archive when querying.
private_band_colors / public_band_colors : tuple
A tuple or list of colors to be associated with private/public
observations in the various bands
integration_time_contour_levels : list or np.array
The levels at which to draw contours in units of seconds. Default is
log-spaced (2^n) seconds: [ 1., 2., 4., 8., 16., 32.])
"""
log.info("Querying {0} for images".format(service))
images = service(target, radius=radius, **service_kwargs)
image0_hdu = images[0][0]
return make_finder_chart_from_image(image0_hdu, target=target,
radius=radius, save_prefix=save_prefix,
alma_kwargs=alma_kwargs,
**kwargs)
def make_finder_chart_from_image(image, target, radius, save_prefix,
alma_kwargs={'public': False,
'science': False,
'cache': False,
},
**kwargs):
"""
Create a "finder chart" showing where ALMA has pointed in various bands,
including different color coding for public/private data and each band.
Contours are set at various integration times.
Parameters
----------
image : fits.PrimaryHDU or fits.ImageHDU object
The image to overlay onto
target : `astropy.coordinates` or str
A legitimate target name
radius : `astropy.units.Quantity`
A degree-equivalent radius
save_prefix : str
The prefix for the output files. Both .reg and .png files will be
written. The .reg files will have the band numbers and
public/private appended, while the .png file will be named
prefix_almafinderchart.png
alma_kwargs : dict
Keywords to pass to the ALMA archive when querying.
private_band_colors / public_band_colors : tuple
A tuple or list of colors to be associated with private/public
observations in the various bands
integration_time_contour_levels : list or np.array
The levels at which to draw contours in units of seconds. Default is
log-spaced (2^n) seconds: [ 1., 2., 4., 8., 16., 32.])
"""
log.info("Querying ALMA around {0}".format(target))
catalog = Alma.query_region(coordinate=target, radius=radius,
get_html_version=True,
**alma_kwargs)
return make_finder_chart_from_image_and_catalog(image, catalog=catalog,
save_prefix=save_prefix,
**kwargs)
def make_finder_chart_from_image_and_catalog(image, catalog, save_prefix,
alma_kwargs={'public': False,
'science': False},
bands=(3, 4, 5, 6, 7, 8, 9, 10),
private_band_colors=('maroon',
'red',
'orange',
'coral',
'brown',
'yellow',
'mediumorchid',
'palegoldenrod',
),
public_band_colors=('blue',
'cyan',
'green',
'turquoise',
'teal',
'darkslategrey',
'chartreuse',
'lime',
),
integration_time_contour_levels=np.logspace(0,
5,
base=2,
num=6),
save_masks=False,
use_saved_masks=False,
linewidth=1,
):
"""
Create a "finder chart" showing where ALMA has pointed in various bands,
including different color coding for public/private data and each band.
Contours are set at various integration times.
Parameters
----------
image : fits.PrimaryHDU or fits.ImageHDU object
The image to overlay onto
catalog : astropy.Table object
The catalog of ALMA observations
save_prefix : str
The prefix for the output files. Both .reg and .png files will be
written. The .reg files will have the band numbers and
public/private appended, while the .png file will be named
prefix_almafinderchart.png
alma_kwargs : dict
Keywords to pass to the ALMA archive when querying.
private_band_colors / public_band_colors : tuple
A tuple or list of colors to be associated with private/public
observations in the various bands
integration_time_contour_levels : list or np.array
The levels at which to draw contours in units of seconds. Default is
log-spaced (2^n) seconds: [ 1., 2., 4., 8., 16., 32.])
"""
import aplpy
import pyregion
all_bands = bands
bands = used_bands = [band
for band in
np.unique(catalog['band_list'])]
log.info("The bands used include: {0}".format(used_bands))
band_colors_priv = dict(zip(all_bands, private_band_colors))
band_colors_pub = dict(zip(all_bands, public_band_colors))
log.info("Color map private: {0}".format(band_colors_priv))
log.info("Color map public: {0}".format(band_colors_pub))
if use_saved_masks:
hit_mask_public = {}
hit_mask_private = {}
for band in bands:
pubfile = '{0}_band{1}_public.fits'.format(save_prefix, band)
if os.path.exists(pubfile):
hit_mask_public[band] = fits.getdata(pubfile)
privfile = '{0}_band{1}_private.fits'.format(save_prefix, band)
if os.path.exists(privfile):
hit_mask_private[band] = fits.getdata(privfile)
else:
today = np.datetime64('today')
# At least temporarily obsolete
# private_circle_parameters = {
# band: [(row['RA'], row['Dec'], np.mean(rad).to(u.deg).value)
# for row, rad in zip(catalog, primary_beam_radii)
# if not row['Release date'] or
# (np.datetime64(row['Release date']) > today and row['Band'] == band)]
# for band in bands}
# public_circle_parameters = {
# band: [(row['RA'], row['Dec'], np.mean(rad).to(u.deg).value)
# for row, rad in zip(catalog, primary_beam_radii)
# if row['Release date'] and
# (np.datetime64(row['Release date']) <= today and row['Band'] == band)]
# for band in bands}
# unique_private_circle_parameters = {
# band: np.array(list(set(private_circle_parameters[band])))
# for band in bands}
# unique_public_circle_parameters = {
# band: np.array(list(set(public_circle_parameters[band])))
# for band in bands}
release_dates = np.array(catalog['obs_release_date'], dtype=np.datetime64)
for band in bands:
log.info("BAND {0}".format(band))
privrows = sum((catalog['band_list'] == band) &
(release_dates > today))
pubrows = sum((catalog['band_list'] == band) &
(release_dates <= today))
log.info("PUBLIC: Number of rows: {0}".format(pubrows,))
log.info("PRIVATE: Number of rows: {0}.".format(privrows))
log.debug('Creating regions')
prv_regions = {
band: pyregion.ShapeList([add_meta_to_reg(fp,
{'integration':
row['t_exptime']})
for row in catalog
for fp in footprint_to_reg(row['s_region'])
if (not row['obs_release_date']) or
(np.datetime64(row['obs_release_date']) >
today and row['band_list'] == band)])
for band in bands}
pub_regions = {
band: pyregion.ShapeList([add_meta_to_reg(fp,
{'integration':
row['t_exptime']})
for row in catalog
for fp in footprint_to_reg(row['s_region'])
if row['obs_release_date'] and
(np.datetime64(row['obs_release_date']) <=
today and row['band_list'] == band)])
for band in bands}
log.debug('Creating masks')
prv_mask = {
band: fits.PrimaryHDU(
prv_regions[band].get_mask(image).astype('int'),
header=image.header) for band in bands if prv_regions[band]}
pub_mask = {
band: fits.PrimaryHDU(
pub_regions[band].get_mask(image).astype('int'),
header=image.header) for band in bands if pub_regions[band]}
hit_mask_public = {band: np.zeros_like(image.data)
for band in pub_mask}
hit_mask_private = {band: np.zeros_like(image.data)
for band in prv_mask}
mywcs = wcs.WCS(image.header)
for band in bands:
log.debug('Adding integration-scaled masks for Band: {0}'.format(band))
shapes = prv_regions[band]
for shape in shapes:
# private: release_date = 'sometime' says when it will be released
(xlo, xhi, ylo, yhi), mask = pyregion_subset(
shape, hit_mask_private[band], mywcs)
log.debug("{0},{1},{2},{3}: {4}"
.format(xlo, xhi, ylo, yhi, mask.sum()))
hit_mask_private[band][ylo:yhi, xlo:xhi] += shape.meta['integration']*mask
if save_masks:
shapes.write('{0}_band{1}_private.reg'.format(save_prefix, band))
shapes = pub_regions[band]
for shape in shapes:
# public: release_date = '' should mean already released
(xlo, xhi, ylo, yhi), mask = pyregion_subset(
shape, hit_mask_public[band], mywcs)
log.debug("{0},{1},{2},{3}: {4}"
.format(xlo, xhi, ylo, yhi, mask.sum()))
hit_mask_public[band][ylo:yhi, xlo:xhi] += shape.meta['integration']*mask
if save_masks:
shapes.write('{0}_band{1}_public.reg'.format(save_prefix, band))
if save_masks:
for band in bands:
if band in hit_mask_public:
if hit_mask_public[band].any():
hdu = fits.PrimaryHDU(data=hit_mask_public[band],
header=image.header)
hdu.writeto('{0}_band{1}_public.fits'.format(save_prefix, band),
clobber=True)
if band in hit_mask_private:
if hit_mask_private[band].any():
hdu = fits.PrimaryHDU(data=hit_mask_private[band],
header=image.header)
hdu.writeto('{0}_band{1}_private.fits'.format(save_prefix, band),
clobber=True)
fig = aplpy.FITSFigure(fits.HDUList(image), convention='calabretta')
fig.show_grayscale(stretch='arcsinh', vmid=np.nanmedian(image.data))
for band in bands:
if band in hit_mask_public:
if hit_mask_public[band].any():
fig.show_contour(fits.PrimaryHDU(data=hit_mask_public[band],
header=image.header),
levels=integration_time_contour_levels,
colors=[band_colors_pub[int(band)]] * len(integration_time_contour_levels),
linewidth=linewidth,
convention='calabretta')
if band in hit_mask_private:
if hit_mask_private[band].any():
fig.show_contour(fits.PrimaryHDU(data=hit_mask_private[band],
header=image.header),
levels=integration_time_contour_levels,
colors=[band_colors_priv[int(band)]] * len(integration_time_contour_levels),
linewidth=linewidth,
convention='calabretta')
fig.save('{0}_almafinderchart.png'.format(save_prefix))
return image, catalog, hit_mask_public, hit_mask_private
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment