Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rstofi/1d73ac5a785147bd887856050022a3c0 to your computer and use it in GitHub Desktop.
Save rstofi/1d73ac5a785147bd887856050022a3c0 to your computer and use it in GitHub Desktop.
Code used for a MeerKLASS memo on the importance of OTFM PB correction and the amplitude errors introduced
"""Simple code to re-create OTFM beam smearing plots. For the default plots, and
mathematical background see:
https://iopscience.iop.org/article/10.3847/1538-4357/aaef7c/pdf
Also, the effect is computed and results are re-computed for MeerKAT. Both for
constant RA and constant elevation (MeerKLASS)
The code is based on a circular GAussian beam input for now.
I try to vectorise things rather than parallelize and hope the underlying C code
is fast enough for these things
MeerKLASS general parameters:
https://skaafrica.atlassian.net/wiki/spaces/ESDKB/pages/277315585/MeerKAT+specifications
Author: Kristof Rozgonyi
e-mail: [email protected]
License: MIT
"""
#=== Imports ===
import os, sys
import numpy as np
from scipy.special import erf
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.offsetbox import AnchoredText
from matplotlib.patheffects import withStroke
#Astropy
from astropy.coordinates import Longitude, Latitude, EarthLocation
from astropy.coordinates import SkyCoord, AltAz
from astropy.time import Time
from astropy import units as u
from astropy.coordinates import AltAz
#=== Globals ===
#RCparams for plotting
matplotlib.rcParams['xtick.direction'] = 'in'
matplotlib.rcParams['ytick.direction'] = 'in'
matplotlib.rcParams['xtick.major.size'] = 9
matplotlib.rcParams['ytick.major.size'] = 9
matplotlib.rcParams['xtick.major.width'] = 3
matplotlib.rcParams['ytick.major.width'] = 3
matplotlib.rcParams['xtick.minor.size'] = 6
matplotlib.rcParams['ytick.minor.size'] = 6
matplotlib.rcParams['xtick.minor.width'] = 2
matplotlib.rcParams['ytick.minor.width'] = 2
matplotlib.rcParams['axes.linewidth'] = 2
plt.rcParams['xtick.labelsize']=16
plt.rcParams['ytick.labelsize']=16
#4 sampled colors from viridis
c0 = '#440154';#Purple
c1 = '#30678D';#Blue
c2 = '#35B778';#Greenish
c3 = '#FDE724';#Yellow
# === Telescope related
# The wideband coarse (4k)
N_CHAN = 410 #I average every 10 channels together not to simulate all 4096chans
L_band_PIXELSIZE = 12 # Arcseconds
L_BAND_NU_CENTRAL = 1.285
L_BAND_CW = 0.00209 # 209kHz channelwidth (208.984 kHz)
UHF_band_PIXELSIZE = 14 # Arcseconds
UHF_BAND_NU_CENTRAL = 0.7975
UHF_BAND_CW = 0.000133 # 132.812 kHz
#=== Functions ===
def deg_to_rad(deg):
return deg * (np.pi / 180)
def arcsecond_to_arcmin(arcsec):
return float(arcsec) / 60.
def arcmin_to_deg(arcmin):
return float(arcmin) / 60.
def arcsecond_to_deg(arcsec):
return float(arcsec) / 360.
def FWHM_to_std(FWHM):
"""Convert Gaussian FWHM to standard deviation
https://en.wikipedia.org/wiki/Full_width_at_half_maximum
"""
return FWHM / 2.355
def std_to_FWHM(std):
"""Convert Gaussian FWHM to standard deviation
https://en.wikipedia.org/wiki/Full_width_at_half_maximum
"""
return std * 2.355
def get_pixel_coordinate_array(N,dn):
"""Quick subroutine to compute the grid cell midpoints around zero (one point
is centered at zero) with N points with dn width. If N is even more negative
pints will be computed as this is preffered fro numpy FFT
"""
return np.linspace(-dn*(np.floor(N/2)),dn*np.ceil((N/2-1)),N,endpoint=True)
def get_VLA_PB_FWHM_from_freq(nu):
"""Compute the VLA PB FWHM based on Pereley VLA memo 195 and the
Mooley et al. 2018 paper
Parameters
==========
nu: float
The given frequency in GHz
Returns
=======
The PB FWHM in arcseconds
"""
return (42 * 60) / nu
def get_MeerKAT_PB_FWHM_from_freq(nu):
"""Compute the MeerKAT PB FWHM based on eq. 4 and 5 from Mauch et al. 2019
(deep2 paper). I use the Horizontal PB model scaling
Parameters
==========
nu: float
The given frequency in GHz
Returns
=======
The PB FWHM in arcseconds
"""
return (86.25 * 60) / nu #i.e. (57.5 * 1.5) / nu
def circular_gaussian_beam_value(x, y, sigma=0.5):
"""A simple 2D Gaussian beam value computed on a given image point
Parameters
==========
x: <numpy.ndarray>
The x (RA) directional coordinates relative to the pointing centre
y: <numpy.ndarray>
The y (Dec) directional coordinates relative to the pointing centre
sigma: float, optional
The standard deviation of the circular Gaussian
Returns
=======
z: float
The primary beam value at [x,y]
"""
if np.shape(x) != np.shape(y):
raise ValueError('x and y have different shapes!')
z = np.exp(-(np.power(x,2) + np.power(y,2)) / (2 * np.power(sigma,2)))
return z
def RA_smeared_gaussian_beam_value(x, y, dx, sigma=0.5):
"""Compute the smeared beam (eq. 10) from
https://iopscience.iop.org/article/10.3847/1538-4357/aaef7c/pdf
Parameters
==========
x: <numpy.ndarray>
The x (RA) directional coordinates relative to the pointing centre
y: <numpy.ndarray>
The y (Dec) directional coordinates relative to the pointing centre
dx: float
The max RA (x) coordinate offset of the beam for a given phase centre
sigma: float, optional
The standard deviation of the circular Gaussian
Returns
=======
z: float
The primary beam value at [x,y]
"""
if np.shape(x) != np.shape(y):
raise ValueError('x and y have different shapes!')
if dx == 0:
return circular_gaussian_beam_value(x, y, sigma=sigma)
else:
b0 = np.exp(-(np.power(x,2) + np.power(y,2)) / (2 * np.power(sigma,2)))
A = (sigma / dx) * b0 * np.exp(np.power(x,2) / (2 * np.power(sigma,2))) \
* np.sqrt(np.pi / 2)
z = A * (erf((x + (dx / 2)) / (np.sqrt(2) * sigma)) \
- erf((x - (dx / 2)) / (np.sqrt(2) * sigma)))
return z
def general_smeared_gaussian_beam_value(x, y, dx, dy, sigma=0.5):
"""Compute the general smeared beam computed using a linear approximation
See the memo for the formula and derivation.
Parameters
==========
x: <numpy.ndarray>
The x (RA) directional coordinates relative to the pointing centre
y: <numpy.ndarray>
The y (Dec) directional coordinates relative to the pointing centre
dx: float
The max RA (x) coordinate offset of the beam for a given phase centre
in arcseconds
dx: float
The max Dec (y) coordinate offset of the beam for a given phase centre
in arcseconds
.
sigma: float, optional
The standard deviation of the circular Gaussian
Returns
=======
z: float
The primary beam value at [x,y]
"""
if np.shape(x) != np.shape(y):
raise ValueError('x and y have different shapes!')
if dx == 0 and dy == 0:
return circular_gaussian_beam_value(x, y, sigma=sigma)
else:
ds = np.sqrt(np.power(dx,2) + np.power(dy,2))
A = (sigma / ds) * np.sqrt(np.pi / 2)
s_exp = np.exp(-(np.power(((x * (dy / ds)) + (y * (dx / ds))),2)) \
/ (2 * np.power(sigma,2)))
s_smear = (erf(((x * (dx / ds)) - (y * (dy / ds)) + (ds / 2)) \
/ (np.sqrt(2) * sigma)) \
- erf(((x * (dx / ds)) - (y * (dy / ds)) - (ds / 2)) \
/ (np.sqrt(2) * sigma)))
z = A * s_exp * s_smear
return z
def compute_beam_image(N, dn,
beam_model='circular_gaussian',
sigma=0.5,
dx=None,
dy=None):
"""Generate a regular image of NxN with dn pixel size and generate the given
beam model.
The beam values are computed at the pixel centre points
Parameters
==========
N: int
The size of the sky image
dn: int
The image pixel size
beam_model: str, optional
Name the primary beam model from the following list:
['circular_gaussian', 'RA_smeared_gaussian', 'general_smeared_gaussian']
sigma: float, optional
The standard deviation of the circular Gaussian
dx: float, optional
The max RA (x) coordinate offset of the beam for a given phase centre (scan)
dy: float, optional
The max RA (x) coordinate offset of the beam for a given phase centre (scan)
Returns
=======
x: <numpy.ndarray>
The pixel x coordinates (RA) as a grid (NxN matrix with constant columns)
y: <numpy.ndarray>
The pixel y coordinates(Dec) as a grid (NxN matrix with constant rows)
z: <numpy.ndarray>
The primary beam values as a grid (NxN matrix)
"""
#This is a dirty type solution
N = int(np.ceil(N))
#Extra end at negative values => This is good for numpy FFT
x = get_pixel_coordinate_array(N,dn)
y = get_pixel_coordinate_array(N,dn)
x, y = np.meshgrid(x,y)
if beam_model == 'circular_gaussian':
z = circular_gaussian_beam_value(x,y,sigma=sigma)
elif beam_model == 'RA_smeared_gaussian':
if dx == None:
raise ValueError('no dx is defined!')
z = RA_smeared_gaussian_beam_value(x,y,sigma=sigma,dx=dx)
elif beam_model == 'general_smeared_beam':
if dx == None:
raise ValueError('no dx is defined!')
if dy == None:
raise ValueError('no dy is defined!')
z = general_smeared_gaussian_beam_value(x,y,sigma=sigma,dx=dx,dy=dy)
else:
raise NotImplementedError('Beam model is wrong or not implemented yet')
#else:
# z = circular_gaussian_beam_value(x,y,sigma=sigma)
# z = z - RA_smeared_gaussian_beam_value(x,y,sigma=sigma,dx=dx)
return x, y, z
def get_beam_slice(x, y, z, slice_dir='x', slice_at=0.):
"""Get a slice from a PB image at constant RA or Dec
Parameters
==========
x: <numpy.ndarray>
The pixel x coordinates (RA) as a grid (NxN matrix with constant columns)
y: <numpy.ndarray>
The pixel y coordinates(Dec) as a grid (NxN matrix with constant rows)
z: <numpy.ndarray>
The primary beam values as a grid (NxN matrix)
slice_dir: string, optional
Slice direction i.e. the direction which the slice is returned
ca be 'RA', 'x' or 'Dec', 'y'
e.g if slice_dir = x a slice at constant y is returned
slice_at: float, optional
The constant RA or Dec value in which the slice is taken
Returns
=======
xy_slice: <numpy.ndarray>
The slice pixel (x or y) coordinates (1D array)
z_slice: <numpy.ndarray>
The beam values at the slice (1D array)
"""
#I skip checking size matches
if slice_dir == 'RA' or slice_dir == 'x':
slice_dir_array = x[0,:]
perpendicular_dir_array = y
elif slice_dir == 'Dec' or slice_dir == 'y':
slice_dir_array = y[:,0]
perpendicular_dir_array = x
else:
raise ValueError('Invalid slice direction is given!')
#Get the closest value in array to the given slice at
#I use the fact that the bem is computed on an NxN regular image
slice_at = slice_dir_array[np.argmin(np.abs(
slice_dir_array - slice_at))]
xy_slice = np.copy(slice_dir_array)
z_slice = np.copy(z[perpendicular_dir_array == slice_at])
return xy_slice, z_slice
def fractional_PB_cange_of_single_slice(pixelsize=1,
PB_FWHM=60,
Ndx = 100,
fdx = 0.01,
slice_dir='x',
slice_at=0.,
N_image=None,
beam_model = 'RA_smeared_gaussian'
):
"""Generates fig 2 data matrix from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
Parameters
==========
pixelsize: float, opt
The image pixelsize in arcseconds
PB_FWHM: float, opt
The Primary Beam FWHM in arcseconds. Used to calculate the std of the PB
Ndx: int, optional
The number of dx coordinate offset values to test
fdx: float, optional
The fractional change of the x coordinate for a single subscan (one phase
centre) from 0 to Ndx * fdx *PB_FWHM
slice_dir: string, optional
Slice direction i.e. the direction which the slice is returned
ca be 'RA', 'x' or 'Dec', 'y'
e.g if slice_dir = x a slice at constant y is returned
slice_at: float, optional
The constant RA or Dec value in which the slice is taken
N_image: int, optional
If None the image size is 2 * PB_FWHM, otherwise this parameter controls
the image sixe (x axis of the plot)
beam_model: str, optional
Name the (smeared) primary beam model from the following list:
['RA_smeared_gaussian']
Returns
=======
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_dx matrix with
constant columns)
dx_array: <numpy.ndarray>
The generated dx value coordinates as a grid (N_image x N_dx matrix with
constant rows)
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_dx matrix)
"""
#Set up the parameters
PB_sigma = FWHM_to_std(PB_FWHM)
if N_image == None:
N_image = np.ceil((PB_FWHM * 2) / pixelsize)
N_image = int(N_image)
ddx = fdx * PB_FWHM
#Get the output arrays
dx_array = np.arange(ddx,(Ndx * ddx) + 1,ddx)
dPB_array = get_pixel_coordinate_array(N_image,pixelsize)
PB_err = np.zeros((Ndx, N_image))
#Get the reference beam
x0, y0, z0 = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = 'circular_gaussian',
sigma = PB_sigma)
ref_xy_slice, ref_z_slice = get_beam_slice(x0,y0,z0,
slice_dir=slice_dir, slice_at=slice_at)
#Fill up the PB_err matrix
for i in range(0,Ndx):
x, y, z = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = beam_model,
sigma = PB_sigma,
dx = dx_array[i])
xy_slice, z_slice = get_beam_slice(x,y,z,
slice_dir=slice_dir, slice_at=slice_at)
PB_err[i,:] = np.divide(z_slice,ref_z_slice)
#Compute the meshgrid coordinates
dPB_array, dx_array = np.meshgrid(dPB_array,dx_array)
del x0, y0, z0, x, y, z, ref_xy_slice, ref_z_slice, xy_slice, z_slice
return dPB_array, dx_array, PB_err
def fractional_PB_cange_with_frequency_of_single_slice(pixelsize=1,
N_image = (14 * 60 * 1.6) + 1,
nu_central = 3,
Nnu = 201,
dnu = 0.01,
PB_scaling_model = 'VLA',
dx = 240,
slice_dir = 'x',
slice_at = 0.,
beam_model = 'RA_smeared_gaussian'
):
"""Generates fig 2 data matrix from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
Parameters
==========
pixelsize: float, opt
The image pixelsize in arcseconds
N_image: int, optional
The image size
nu_central: float, opt
The central frequency in GHz
Nnu: int, optional
The number of frequency channels to test
dnu: float, opt
The width of a single frequency channel in GHz
PB_scaling_model: str, opt
The model of how the PB FWHM depends on the observing frequency. Can be
any from the following list: ['VLA', 'MeerKAT']
dx: float, optional
The absoolute change of the x coordinate for a single subscan used for
all frequencies (in arcseconds)
slice_dir: string, optional
Slice direction i.e. the direction which the slice is returned
ca be 'RA', 'x' or 'Dec', 'y'
e.g if slice_dir = x a slice at constant y is returned
slice_at: float, optional
The constant RA or Dec value in which the slice is taken
beam_model: str, optional
Name the (smeared) primary beam model from the following list:
['RA_smeared_gaussian']
Returns
=======
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_dx matrix with
constant columns)
dx_array: <numpy.ndarray>
The generated dx value coordinates as a grid (N_image x N_dx matrix with
constant rows)
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_dx matrix)
"""
N_image = int(np.ceil(N_image))
#Get the output arrays
nu_array = np.add(get_pixel_coordinate_array(Nnu,dnu), nu_central)
dPB_array = get_pixel_coordinate_array(N_image,pixelsize)
PB_err = np.zeros((Nnu, N_image))
#Get the frequency deendent PB FWHM array
PB_FWHM_array = np.zeros(Nnu)
if PB_scaling_model == 'VLA':
for i in range(0,Nnu):
PB_FWHM_array[i] = get_VLA_PB_FWHM_from_freq(nu_array[i])
elif PB_scaling_model == 'MeerKAT':
for i in range(0,Nnu):
PB_FWHM_array[i] = get_MeerKAT_PB_FWHM_from_freq(nu_array[i])
#Fill up the PB_err matrix
for i in range(0,Nnu):
#Get the reference beam
x0, y0, z0 = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = 'circular_gaussian',
sigma = FWHM_to_std(PB_FWHM_array[i]))
ref_xy_slice, ref_z_slice = get_beam_slice(x0,y0,z0,
slice_dir=slice_dir, slice_at=slice_at)
#Compute the smeared beam
x, y, z = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = beam_model,
sigma = FWHM_to_std(PB_FWHM_array[i]),
dx = dx)
xy_slice, z_slice = get_beam_slice(x,y,z,
slice_dir=slice_dir, slice_at=slice_at)
PB_err[i,:] = np.divide(z_slice,ref_z_slice)
#Compute the meshgrid coordinates
dPB_array, nu_array = np.meshgrid(dPB_array,nu_array)
del x0, y0, z0, x, y, z, ref_xy_slice, ref_z_slice, xy_slice, z_slice
return dPB_array, nu_array, PB_err
def get_smearing_from_altaz_subscan(t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = 2.,
telescope = 'MeerKAT',
separation_only=False):
"""Compute the smeared beam from the altaz parameters pelescope pointing
position and sub-scan integration time
==========
t0: <astropy.Time>, optional
The the observation time in UTC with y-m-dTh:m:s. format at the time
centroid of the sub-scan
alt0: float, optional
The altitude of the observation at the phase centre
az0: float, optional
The azimuth at the observation at the phase centre
slew_velocity: float, optional
The slew velocity in arcseconds (!). Defined in the azimutal direction
slew_direction: int, optional
The direction of slewing. Can be [+1,-1]
dt: float, optional
The sub-scan integration time
telescope: str, optional
The telescope used for the subscan simulations. Possible values are:
['MeerKAT']
separetion_only: bool, optional
If true, onlt the Delta s angular separation is returned
Returns
=======
dRA: float
The RA separation for the sub-scan (corrected for Dec) in arcseconds
dDec: float
The Dec separation for the sub-scan in arcseconds
"""
if telescope == 'MeerKAT':
location = EarthLocation.from_geodetic(
Longitude('21:26:38.0', u.degree, wrap_angle=180. * u.degree, copy=False),
Latitude('-30:42:39.8', u.degree, copy=False),
height=u.Quantity(1086.6, u.m, copy=False))
else:
raise NotImplementedError('Only MeerKAT telescope is implemented yet')
#Convert slew velocity to degrees
slew_velocity_deg = arcmin_to_deg(slew_velocity)
dt_half = (dt / 2)
t1 = t0 - (dt_half * u.second)
t2 = t0 + (dt_half * u.second)
az1 = az0 - (slew_direction * np.multiply(slew_velocity_deg, dt_half))
az2 = az0 + (slew_direction * np.multiply(slew_velocity_deg, dt_half))
centre_icrs_coordinates = SkyCoord(alt = alt0 * u.deg,
az = az0 * u.deg,
obstime = t0,
frame = 'altaz',
location = location).transform_to('icrs')
start_icrs_coordinates = SkyCoord(alt = alt0 * u.deg,
az = az1 * u.deg,
obstime = t1,
frame = 'altaz',
location = location).transform_to('icrs')
end_icrs_coordinates = SkyCoord(alt = alt0 * u.deg,
az = az2 * u.deg,
obstime = t2,
frame = 'altaz',
location = location).transform_to('icrs')
dra = (end_icrs_coordinates.ra-start_icrs_coordinates.ra).arcsec \
* np.cos(centre_icrs_coordinates.dec.rad)
dDec = (end_icrs_coordinates.dec-start_icrs_coordinates.dec).arcsec
sep = end_icrs_coordinates.separation(start_icrs_coordinates).arcsec
if separation_only:
return sep
else:
return dra, dDec
#=== Survey and scanning pattern defining functions ===
def get_stripe_coordinate_values_constant_az_slew(t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = 2.,
T_stripe = 216.,
t0_rel = 0.,
telescope = 'MeerKAT',
frame = 'icrs'):
"""This function returns a relative time array and the corresponding RA and
Dec arrays for a given sstripe.
The stripe slew is performed with a constant altitude (elevation) pointing and so
slewing only in the azimuthal direction.
Parameters
==========
t0: <astropy.Time>, optional
The start of the observation time in UTC with y-m-dTh:m:s. format
alt0: float, optional
The altitude of the observation at t0
az0: float, optional
The azimuth at the observation at t0
slew_velocity: float, optional
The slew velocity in arcseconds (!). Defined in the azimutal direction
slew_direction: int, optional
The direction of slewing. Can be [+1,-1]
dt: float, optional
The visibility sampling time in seconds
T_stripe: float, optional
The time which under a single stripe is performed
t0_rel: float, optional
The relative start time of the observation (used for several stripe)
telescope: str, optional
The telescope used for the stripe simulations. Possible values are:
['MeerKAT']
frame: stra, optional
Available frames: ['icrs','altaz']
Returns
=======
t_stripe: <numpy.ndarray>
The time values of the stripe relative to the observation's starting
time in seconds.
x_stripe: <numpy.ndarray>
The RA (icrs) or Alt (altaz) values of the stripe in degrees
y_stripe: <numpy.ndarray>
The Dec (icrs) or Az (altaz) values of the stripe in degrees
ti: <astropy.Time>
The last time sample's UTC time stamp (to be used as an input for the next
stripe)
azi: float
The last ime sample's azimuth in degrees (to be used as an input for
the next stripe)
"""
if telescope == 'MeerKAT':
location = EarthLocation.from_geodetic(
Longitude('21:26:38.0', u.degree, wrap_angle=180. * u.degree, copy=False),
Latitude('-30:42:39.8', u.degree, copy=False),
height=u.Quantity(1086.6, u.m, copy=False))
else:
raise NotImplementedError('Only MeerKAT telescope is implemented yet')
#Convert slew velocity to degrees
slew_velocity_deg = arcmin_to_deg(slew_velocity)
#Get the relative time array
t_stripe = np.arange(t0_rel,T_stripe,dt)
#Get alt az vectors
alt_stipe = alt0 * np.ones(np.size(t_stripe))
az_stipe = az0 + (slew_direction * np.multiply(slew_velocity_deg, t_stripe))
t_obs_stripe = t0 + t_stripe * u.second
#Convert to RA Dec
stripe_altaz_coordinates = SkyCoord(alt = alt_stipe * u.deg,
az = az_stipe * u.deg,
obstime = t_obs_stripe,
frame = 'altaz',
location = location)
if frame == 'altaz':
x_stripe = stripe_altaz_coordinates.alt.deg
y_stripe = stripe_altaz_coordinates.az.deg
elif frame == 'icrs':
stripe_icrs_coordinates = stripe_altaz_coordinates.transform_to('icrs')
x_stripe = stripe_icrs_coordinates.ra.deg
y_stripe = stripe_icrs_coordinates.dec.deg
else:
raise NotImplementedError('Only icrs and altaz frames are implemented')
ti = t_obs_stripe[-1]
#alti = stripe_altaz_coordinates[-1].alt.deg
azi = stripe_altaz_coordinates[-1].az.deg
return t_stripe, x_stripe, y_stripe, ti, azi
def get_field_scan_coordinate_values_constant_az_slew(N_scan = 26,
t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = 2.,
T_stripe = 216.,
t0_rel = 0.,
telescope = 'MeerKAT',
frame = 'icrs'):
"""Create the full scan of the target field, defined via a starting stripe
and the number of consequent stripe.
Parameters
==========
N_scan: int, optional,
The number of stripes used to cover the field simulated
t0: <astropy.Time>, optional
The start of the observation time in UTC with y-m-dTh:m:s. format
alt0: float, optional
The altitude of the observation at t0
az0: float, optional
The azimuth at the observation at t0
slew_velocity: float, optional
The slew velocity in arcseconds (!). Defined in the azimutal direction
slew_direction: int, optional
The direction of slewing. Can be [+1,-1]
dt: float, optional
The visibility sampling time in seconds
T_stripe: float, optional
The time which under a single stripe is performed
t0_rel: float, optional
The relative start time of the observation (used for several stripes)
telescope: str, optional
The telescope used for the stripe simulations. Possible values are:
['MeerKAT']
frame: stra, optional
Available frames: ['icrs','altaz']
Returns
=======
t_field: <numpy.ndarray>
The time values of the full field scans relative to the observation's
starting time in seconds.
x_field: <numpy.ndarray>
The RA (icrs) or Alt (altaz) values of the full field scans in degrees
y_field: <numpy.ndarray>
The Dec (icrs) or Az (altaz) values of the full field scans in degrees
"""
#Define the output arrays
t_field = np.arange(t0_rel,T_stripe * N_scan,dt)
x_field = np.zeros(np.size(t_field))
y_field = np.zeros(np.size(t_field))
#Loop through the stripes
ti = t0
azi = az0
for i in range(0,N_scan):
t_stripe, x_stripe, y_stripe, ti, azi = \
get_stripe_coordinate_values_constant_az_slew(
t0 = ti,
alt0 = alt0,
az0 = azi,
slew_velocity = slew_velocity,
slew_direction = slew_direction,
dt = dt,
T_stripe = T_stripe,
t0_rel = t0_rel,
telescope = telescope,
frame = frame)
#ti_rel += T_stripe * dt
slew_direction *= -1
stripe_size = np.size(t_stripe) #This is ugly but I am mad atm for somethiong else so I dont care...
counter = i * stripe_size
t_field[counter:counter + stripe_size] = t_stripe + (i * T_stripe)
x_field[counter:counter + stripe_size] = x_stripe
y_field[counter:counter + stripe_size] = y_stripe
return t_field, x_field, y_field
#=== Plotting functions ===
def add_inner_title(ax, title, loc, prop=None, white_border=True, **kwargs):
"""Create a fancy inner title for plots
Parameters
==========
ax: `matplotlib.axex`
The axis which the inner title will be created
title: str
The title created
loc: int
Location. Can be verbose as well I think...
prop: dict, optional
A dictionary containing the properties of the title
eg. dict(size=18, color='green')
white_border: bool, optional
If True, no white borders are drawn around the text
Return
======
at: artist
The inner title artist that is added to the plot
"""
if prop is None:
prop = dict(size=plt.rcParams['legend.fontsize'])
at = AnchoredText(title, loc=loc, prop=prop,
pad=0., borderpad=0.75,
frameon=False, **kwargs)
ax.add_artist(at)
if white_border:
at.txt._text.set_path_effects([withStroke(foreground="w", linewidth=3)])
return at
def plot_beam(x,y,z,ptitle=None):
"""Create a simple beam plot
Parameters
==========
x, y, z: <numpy.ndarray>
Output positional and beam value vectors from `compute_beam_image`
ptitle: str
The title of the plot
Returns
=======
Create an image
"""
fig = plt.figure(1, figsize=(12,12))
if ptitle != None:
fig.suptitle('Primary beam model: {0:s}'.format(ptitle), fontsize = 18)
plt.gca().set_aspect('equal')
beam = plt.pcolor(x, y, z, rasterized = True, shading='auto')
cb = plt.colorbar(beam, aspect=30, fraction=0.04975, pad=0)
plt.rcParams['contour.negative_linestyle'] = 'solid'
cb.ax.yaxis.get_offset_text().set_fontsize(18)
cb.ax.tick_params(labelsize=18)
cb.ax.tick_params(direction='in', length=6, width=2)
cb.ax.set_ylabel(r'B', fontsize = 18)
CS = plt.contour(x, y, z,
levels = [np.amax(z)/2,np.amax(z)*2/3],
colors='w', alpha=1.)
plt.clabel(CS, fontsize=14, inline=True)
plt.xlabel(r'x', fontsize = 18)
plt.ylabel(r'y', fontsize = 18)
plt.show()
plt.close()
def plot_fractional_PB_change(dPB_array, dx_array, PB_err,
fname,
ptitle=None,
PB_FWHM = 60,
PB_normalisation = True,
cont_levels = [],
half_beam = True):
"""Generates fig 2 plot from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
for more info see `fractional_PB_cange_of_single_slice()`
Parameters
==========
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_dx matrix with
constant columns)
dx_array: <numpy.ndarray>
The generated dx value coordinates as a grid (N_image x N_dx matrix with
constant rows)
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_dx matrix)
fname: str
The full path and name and extension of the output image
ptitle: str, opt
The title of the plot
PB_FWHM: float, opt
The Primary Beam FWHM in arcseconds. Used to calculate the std of the PB
PB_normalisation: bool, opt
If True the plots x and y axis will be normalised by the PB FWHM
cont_levels: list, optional
The contour level values (default values if not given)
half_beam: bool, optional
If True, only half of the beam is being polotted due to symmetry reasons
Returns
=======
Create an image
"""
fig = plt.figure(1, figsize=(9,4.5))
if ptitle != None:
plt.title(ptitle, fontsize = 18)
if PB_normalisation:
beam = plt.pcolor(dPB_array / PB_FWHM, dx_array / PB_FWHM, PB_err,
rasterized=True, shading='auto')
else:
beam = plt.pcolor(dPB_array, dx_array, PB_err,
rasterized=True, shading='auto')
cb = plt.colorbar(beam, aspect=30, fraction=0.04975, pad=0)
cb.ax.yaxis.get_offset_text().set_fontsize(18)
cb.ax.tick_params(labelsize=18)
cb.ax.tick_params(direction='in', length=6, width=2)
cb.ax.set_ylabel(r'B$_{eff, \nu_0}$/B$_{0, \nu_0}$', fontsize = 18)
#Contours
if len(cont_levels) == 0:
CS = plt.contour(dPB_array / PB_FWHM, dx_array / PB_FWHM, PB_err,
colors='w', alpha=1.)
else:
CS = plt.contour(dPB_array / PB_FWHM, dx_array / PB_FWHM, PB_err,
levels = cont_levels,
colors='w', alpha=1.)
plt.clabel(CS, fontsize=14, inline=True)
if PB_normalisation:
plt.xlabel(r's$_0$/$\Theta_{B_{0, \nu_0}}$', fontsize = 18)
plt.ylabel(r'$\Delta$s/$\Theta_{B_{0, \nu_0}}$', fontsize = 18)
else:
plt.xlabel(r's$_0$', fontsize = 18)
plt.ylabel(r'$\Delta$s', fontsize = 18)
if half_beam:
plt.xlim(0)
#plt.show()
plt.savefig(fname, bbox_inches='tight')
plt.close()
def plot_fractional_PB_change_with_frequency(dPB_array, nu_array, PB_err,
fname,
ptitle=None,
PB_FWHM = None,
PB_normalisation = True,
cont_levels = [],
half_beam = True,
inner_title = None):
"""Generates fig 2 plot from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
for more info see `fractional_PB_cange_of_single_slice()`
Parameters
==========
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_nu matrix with
constant columns)
nu_array: <numpy.ndarray>
The generated frequency value coordinates as a grid (N_image x N_nu
matrix with constant rows) given in GHz
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_nu matrix)
fname: str
The full path and name and extension of the output image
ptitle: str, opt
The title of the plot
PB_FWHM_reference: float, opt
The Primary Beam FWHM for a given ref frequency in arcseconds.
PB_normalisation: bool, opt
If True the plots x and y axis will be normalised by the PB FWHM
cont_levels: list, optional
The contour level values (default values if not given)
half_beam: bool, optional
If True, only half of the beam is being polotted due to symmetry reasons
Returns
=======
Create an image
"""
#get the PB at the lowest frequency
if PB_FWHM == None:
PB_FWHM = get_VLA_PB_FWHM_from_freq(np.amin(nu_array))
fig = plt.figure(1, figsize=(9,4.5))
if ptitle != None:
plt.title(ptitle, fontsize = 18)
if PB_normalisation:
beam = plt.pcolor(dPB_array / PB_FWHM, nu_array, PB_err,
rasterized=True, shading='auto')
else:
beam = plt.pcolor(dPB_array, nu_array, PB_err,
rasterized=True, shading='auto')
cb = plt.colorbar(beam, aspect=30, fraction=0.04975, pad=0)
cb.ax.yaxis.get_offset_text().set_fontsize(18)
cb.ax.tick_params(labelsize=18)
cb.ax.tick_params(direction='in', length=6, width=2)
cb.ax.set_ylabel(r'B$_{eff}$/B$_0$', fontsize = 18)
#Contours
if len(cont_levels) == 0:
CS = plt.contour(dPB_array / PB_FWHM, nu_array, PB_err,
colors='w', alpha=1.)
else:
CS = plt.contour(dPB_array / PB_FWHM, nu_array, PB_err,
levels = cont_levels,
colors='w', alpha=1.)
plt.clabel(CS, fontsize=14, inline=True)
if PB_normalisation:
plt.xlabel(r's$_0$/$\Theta_{B,\nu_{0}}$', fontsize = 18)
else:
plt.xlabel(r's$_0$', fontsize = 18)
plt.ylabel(r'$\nu$ [GHz]', fontsize = 18)
if half_beam:
plt.xlim(0)
if inner_title != None:
ax = plt.gca()
add_inner_title(ax, inner_title, loc=3, prop=dict(size=16))
#plt.show()
plt.savefig(fname, bbox_inches='tight')
plt.close()
def plot_field_scan(RA_field,
Dec_field,
fname,
ptitle = '',
setting = False,
RA_field_setting = None,
Dec_field_setting = None):
"""Plot a scan pattern of a field, both the rising and setting scans if the
latter is provided.
Basicaly creates similar figures as Fig. 3 in Wang et aql. 2021
Parameters
==========
RA_field: <numpy.ndarray>
The RA values of the full field scans in degrees
Dec_field: <numpy.ndarray>
The Dec values of the full field scans in degrees
fname: str
The full path and name and extension of the output image
ptitle: str, opt
The title of the plot
setting: bool, optional
If True, the setting scanning pattern is also drawn
RA_field_setting: <numpy.ndarray>, optional
The RA values of the full field setting scans in degrees
Dec_field_setting: <numpy.ndarray>, optional
The Dec values of the full field setting scans in degrees
Returns
=======
Create an image
"""
if setting:
if np.shape(RA_field) != np.shape(RA_field_setting) or \
np.shape(Dec_field) != np.shape(Dec_field_setting):
raise ValueError("Input scan coordinate lengths are different!")
fig = plt.figure(1, figsize=(9,4.5))
if ptitle != None:
plt.title(ptitle, fontsize = 18)
if setting:
plt.plot(RA_field, Dec_field, '-', c=c0, lw=2., alpha=1., label='rising')
plt.plot(RA_field_setting, Dec_field_setting, '-', c=c2, lw=2., alpha=1.,
label='setting')
legend = plt.legend(loc='upper right', fontsize=18, framealpha=1., edgecolor='black')
legend.get_frame().set_linewidth(2.)
else:
plt.plot(RA_field, Dec_field, '-', c=c0, lw=2., alpha=1.)
ax = plt.gca()
ax.invert_xaxis()
plt.ylabel(r'Dec (J2000) [deg]', fontsize = 18)
plt.xlabel(r'RA (J2000) [deg]', fontsize = 18)
plt.savefig(fname, bbox_inches='tight')
plt.close()
#=== FUNCTIONS GENERATING THE ANALYTIC PLOTS ===
def VLA_OTFM_plots():
"""Re-create the Mooley et al. 2018 Figs 2. & 3.
More precisely, fig 3. is not re-created but a similar figure with more
physical mening/relevance is shown. Where the x axis uses a fixed PB
FWHM (the smallest one), as the pointings are frequency dependent.
This is basically a check that our code works as intended
"""
telescope = 'VLA'
pixelsize = 2.5 #This equals the synthesised beam size for now
dx = 4 * 60 #The absolute RA offset within each subscan
nu_central = 3.
N_chan = 201
chan_width = 0.01
#PB_FWHM = 60 * 10 #The PB FWHM in arcseconds at 3 GHz
PB_FWHM = get_VLA_PB_FWHM_from_freq(nu_central)
#Get PB values
nu_array = get_pixel_coordinate_array(N_chan,chan_width) + nu_central
#=== Fractional beam change ===
contours = [0.825,0.85,0.875,0.9,0.925,0.95,0.97,0.98,0.99,0.995,1.,\
1.005,1.01,1.02,1.03,1.05,1.1,1.2,1.3]
#RA
dPB_array, dx_array, PB_err = fractional_PB_cange_of_single_slice(
pixelsize = pixelsize,
PB_FWHM = PB_FWHM,
Ndx = 99, #So it goes up to dx=0.1
fdx = 0.01,
slice_dir='x',
slice_at=0.,
N_image=((PB_FWHM * 1.6) / pixelsize) + 1) #So I plot up to the +/- 0.8
#of the PB FWHM
parameters_string = r'{0:s} OTFM at {1:0.1f} GHz with' \
'$\Theta_B$={2:.1f}\' & $\Theta_s$={3:.1f}\" RA slice'.format(
telescope,nu_central,PB_FWHM / 60,pixelsize)
fname = './figures/{0:s}_fractional_beam_change_RA_slice.pdf'.format(
telescope)
plot_fractional_PB_change(dPB_array, dx_array, PB_err,
fname = fname,
ptitle = parameters_string,
PB_FWHM = PB_FWHM,
cont_levels=contours)
#=== Frequency dependence ===
contours = [0.97,0.98,0.99,0.995,1.,1.005,1.01,1.02,1.03,1.04,1.06]
#Normalising with max freq
nu_ext = np.amax(nu_array)
nu_ext_PB_FWHM = get_VLA_PB_FWHM_from_freq(nu_ext)
dPB_array, dnu_array, PB_err = fractional_PB_cange_with_frequency_of_single_slice(
pixelsize = pixelsize,
N_image = ((nu_ext_PB_FWHM * 1.6) / pixelsize) + 1,
nu_central = nu_central,
Nnu = N_chan,
dnu = chan_width,
dx = dx,
PB_scaling_model = telescope,
slice_dir='x')
parameters_string = r'{0:s} OTFM with ' \
r'$\Delta$x={1:.1f}'"'"r' & $\nu$$_0$={2:.1f} GHz RA slice'.format(
telescope, dx/60, nu_ext)
fname = './figures/\
freq_dependent_{0:s}_fractional_beam_change.pdf'.format(telescope)
plot_fractional_PB_change_with_frequency(dPB_array, dnu_array, PB_err,
fname = fname,
ptitle = parameters_string,
PB_FWHM = nu_ext_PB_FWHM,
cont_levels = contours)
def plot_example_MeerKLASS_field_scans():
"""Create a similar figure for the simulated scans as in Wang et al. 2021
Fig. 3.
"""
t_field_rising, x_field_rising, y_field_rising = \
get_field_scan_coordinate_values_constant_az_slew()
t_field_setting, x_field_setting, y_field_setting = \
get_field_scan_coordinate_values_constant_az_slew(
t0 = Time('2019-02-25T00:40:11'),
alt0 = 41.,
az0=-43.)
plot_field_scan(RA_field = x_field_rising,
Dec_field = y_field_rising,
fname = './figures/example_scan_coverage.pdf',
ptitle = '',
setting = True,
RA_field_setting = x_field_setting,
Dec_field_setting = y_field_setting)
def MeerKLASS_OTFM_plots(band='L'):
"""Create the OTFM smearing plots for MeerKLASS
"""
telescope = 'MeerKAT'
#From the MeerKLass setup i.e Wang et al. 2021
if band == 'L':
pixelsize = L_band_PIXELSIZE
nu_central = L_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = L_BAND_CW
elif band == 'UHF':
pixelsize = UHF_band_PIXELSIZE
nu_central = UHF_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = UHF_BAND_CW
else:
raise ValueError('Invalid band name (S is not supported)!')
#I use the top frequency of the band as discussed in the memo
#PB_FWHM = get_MeerKAT_PB_FWHM_from_freq(nu_central)
#Get PB values
nu_array = get_pixel_coordinate_array(N_chan,chan_width) + nu_central
PB_FWHM = get_MeerKAT_PB_FWHM_from_freq(np.amax(nu_array))
#=== Fractional beam change ===
contours = [0.85,0.9,0.95,0.99,1.,1.01,1.05,1.1,1.2,1.3,1.4,1.5]
dPB_array, dx_array, PB_err = fractional_PB_cange_of_single_slice(
pixelsize = pixelsize,
PB_FWHM = PB_FWHM,
Ndx = 99, #So it goes up to dx=0.1
fdx = 0.01,
slice_dir='x',
slice_at=0.,
N_image=((PB_FWHM * 1.6) / pixelsize) + 1) #So I plot up to the +/- 0.8
#of the PB FWHM
#parameters_string = r'Fractional PB cange for {0:s} OTFM at {1:0.1f} GHz with' \
# '$\Theta_B$={2:.1f}\' & $\Theta_s$={3:.1f}\" x slice'.format(
# telescope,nu_central,PB_FWHM / 60,pixelsize)
fname = './figures/{0:s}_fractional_beam_change_{1:s}.pdf'.format(
telescope, band)
plot_fractional_PB_change(dPB_array, dx_array, PB_err,
fname = fname,
ptitle = None,
PB_FWHM = PB_FWHM,
cont_levels=contours)
#=== Frequency dependence ===
contours = [0.9,0.925,0.95,0.975,0.99,1.,1.01,1.05,1.1,1.2]
#Normalising with max freq
nu_ext = np.amax(nu_array)
nu_ext_PB_FWHM = get_MeerKAT_PB_FWHM_from_freq(nu_ext)
for dt in [2.,4.,6.,8.,10.,12.]:
#Get ds for the frequncy dependence plots
#Rising field parameters
dRA, dDec = get_smearing_from_altaz_subscan(t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = dt)
ds = np.sqrt(np.power(dRA,2) + np.power(dDec,2))
dPB_array, dnu_array, PB_err = fractional_PB_cange_with_frequency_of_single_slice(
pixelsize = pixelsize,
N_image = ((nu_ext_PB_FWHM * 1.6) / pixelsize) + 1,
nu_central = nu_central,
Nnu = N_chan,
dnu = chan_width,
dx = ds,
PB_scaling_model = telescope,
slice_dir='x')
#parameters_string = r'{0:s} OTFM with ' \
#r'$\Delta$x={1:.1f}'"'"r' & $\nu$$_0$={2:.1f} GHz RA slice'.format(
# telescope, dx/60, nu_ext)
fname = './figures/\
{0:s}_freq_dependent_fractional_beam_change_ds_{1:d}_{2:s}.pdf'.format(telescope,
int(dt), band)
plot_fractional_PB_change_with_frequency(dPB_array, dnu_array, PB_err,
fname = fname,
ptitle = '',
PB_FWHM = nu_ext_PB_FWHM,
cont_levels = contours,
inner_title = \
r"$\Delta$T = {0:d}s & $\Delta$s = {1:.2f}".format(
int(dt),ds / nu_ext_PB_FWHM) \
+ r" $\Theta_{B,\nu_{0}}$")
def print_PB_parameters_and_smearing_difference(band='L'):
"""Just print out various useful parameters for the discussion
"""
telescope = 'MeerKAT'
#From the MeerKLass setup i.e Wang et al. 2021
if band == 'L':
pixelsize = L_band_PIXELSIZE
#nu_central = 1.284
nu_central = L_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = L_BAND_CW
elif band == 'UHF':
pixelsize = UHF_band_PIXELSIZE
nu_central = UHF_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = UHF_BAND_CW
else:
raise ValueError('Invalid band name (S is not supported)!')
print('Selected observing band: {0:s}'.format(band))
#Get PB values
nu_array = get_pixel_coordinate_array(N_chan,chan_width) + nu_central
print('The PB FWHM at min freq: {0:.2f} arcminutes'.format(
arcsecond_to_arcmin(get_MeerKAT_PB_FWHM_from_freq(np.amin(nu_array)))))
print('The PB FWHM at max freq: {0:.2f} arcminutes'.format(
arcsecond_to_arcmin(get_MeerKAT_PB_FWHM_from_freq(np.amax(nu_array)))))
#Compute the difference for the rising and setting skyes
for dt in [2.,4.,6.,8.,10.,12.]:
#Get ds for the frequncy dependence plots
#Rising field parameters
dRA, dDec = get_smearing_from_altaz_subscan(
t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = dt)
ds = np.sqrt(np.power(dRA,2) + np.power(dDec,2))
#Setting field parameters
dRA_setting, dDec_setting = get_smearing_from_altaz_subscan(
t0 = Time('2019-02-25T00:40:11'),
alt0 = 41.,
az0=-43.,
dt = dt)
ds_setting = np.sqrt(np.power(dRA_setting,2) + np.power(dDec_setting,2))
print('For dT={0:d}s the rising and setting smearings are {1:.2f} and \
{2:.2f} arcminutes, with a difference of {3:.2f} arcminutes'.format(
int(dt),
arcsecond_to_arcmin(ds),
arcsecond_to_arcmin(ds_setting),
arcsecond_to_arcmin(np.fabs(np.subtract(ds,ds_setting)))))
#=== MAIN ===
if __name__ == "__main__":
# Need to have a figures/ subdir
#VLA_OTFM_plots()
#plot_example_MeerKLASS_field_scans()
#MeerKLASS_OTFM_plots(band='L')
#print_PB_parameters_and_smearing_difference(band='L')
MeerKLASS_OTFM_plots(band='UHF')
print_PB_parameters_and_smearing_difference(band='UHF')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment