Skip to content

Instantly share code, notes, and snippets.

@Sunmish
Last active June 14, 2022 07:27
Show Gist options
  • Save Sunmish/6debab18866de83480b0de7ff542cbfb to your computer and use it in GitHub Desktop.
Save Sunmish/6debab18866de83480b0de7ff542cbfb to your computer and use it in GitHub Desktop.
Merge source-lists to create a catalogue.
#! /usr/bin/env python
from argparse import ArgumentParser
import os
import numpy as np
from astropy.io import fits
from astropy.table import Table, vstack
from astropy.coordinates import SkyCoord
from astropy import units as u
REMOVE_COLS = [
"Source_id",
"Isl_id",
"Gaus_id",
"Wave_id",
"Xposn",
"E_Xposn",
"Yposn",
"E_Yposn",
"Maj_img_plane",
"E_Maj_img_plane",
"Min_img_plane",
"E_min_img_plane",
"PA_img_plane",
"E_PA_img_plane",
"DC_PA_img_plane",
"E_DC_PA_img_plane",
"DC_Maj_img_plane",
"E_DC_Maj_img_plane",
"DC_Min_img_plane",
"E_DC_Min_img_plane",
"Isl_mean",
"Residual_Isl_mean"
]
class PrimeCatalogue():
def __init__(self, catalogue,
duplicate_sep=None,
ra_key="RA",
dec_key="DEC",
major_key="Maj"):
self.table = Table.read(catalogue)
self.ra = ra_key
self.dec = dec_key
self.coords = SkyCoord(ra=self.table[self.ra],
dec=self.table[self.dec]
)
self.duplicate_sep = duplicate_sep
self.duplicate = np.full((len(self.table),), False, dtype=bool)
def merge_with_prime(self, catalogue, declination_coords=None,
major_key=None, major_multiplier=1.):
"""Merge secondary catalogue with primary catalogue.
Matching is to ensure duplicates are removed (from the secondary) and
the remainder of the table is appended to the primary catalogue.
"""
table = Table.read(catalogue)
coords = SkyCoord(ra=table[self.ra],
dec=table[self.dec]
)
if declination_coords is not None:
table, coords = PrimeCatalogue.trim_declination(
table=table,
declination_range=declination_coords,
ra=self.ra,
dec=self.dec
)
idx, seps, _ = coords.match_to_catalog_sky(self.coords)
nidx, nseps, _ = coords.match_to_catalog_sky(self.coords, nthneighbor=2)
if major_key is not None:
duplicate_sep1 = table[major_key] / 2. / major_multiplier
duplicate_sep2 = self.table[major_key] / 2. / major_multiplier
cond1 = seps.value < duplicate_sep1
cond2 = seps.value < duplicate_sep2[idx]
temp_idx1 = np.where(cond1 & cond2)[0]
temp_idx2 = idx[temp_idx1]
temp_idx = np.array([table["Tile_centre_sep"][temp_idx1],
self.table["Tile_centre_sep"][temp_idx2]]).argmin(axis=0)
bool1 = np.full_like(idx, False, dtype=bool)
bool2 = np.full_like(self.coords.ra.value, False, dtype=bool)
include_idx1 = temp_idx1[np.where(temp_idx == 1)[0]]
include_idx2 = temp_idx2[np.where(temp_idx == 0)[0]]
bool1[include_idx1] = True
bool2[include_idx2] = True
self.table = vstack([self.table[~bool2], table[~bool1]])
elif self.duplicate_sep is not None:
duplicate_sep = self.duplicate_sep
temp_idx1 = np.where(seps.value < duplicate_sep)[0]
temp_idx2 = idx[temp_idx1]
temp_idx = np.array([table["Tile_centre_sep"][temp_idx1],
self.table["Tile_centre_sep"][temp_idx2]]).argmin(axis=0)
bool1 = np.full_like(idx, False, dtype=bool)
bool2 = np.full_like(self.coords.ra.value, False, dtype=bool)
include_idx1 = temp_idx1[np.where(temp_idx == 1)[0]]
include_idx2 = temp_idx2[np.where(temp_idx == 0)[0]]
bool1[include_idx1] = True
bool2[include_idx2] = True
self.table = vstack([self.table[~bool2], table[~bool1]])
# self.duplicate = vstack([self.duplicate[~bool2],
# np.full((len(table[~bool1]),), False, dtype=bool)
# ])
else:
self.table = vstack([self.table, table])
self.coords = SkyCoord(ra=self.table[self.ra],
dec=self.table[self.dec]
)
def _trim_declination(self, declination_range):
"""Trim catalogue based on max and minimum declination."""
self.table, self.coords = PrimeCatalogue.trim_declination(
table=self.table,
declination_range=declination_range,
ra=self.ra,
dec=self.dec
)
def trim_gp(self, gp_range):
"""Trim catalogue based on Galactic latitude."""
cond1 = self.coords.galactic.b.value < gp_range[0]
cond2 = self.coords.galactic.b.value > gp_range[1]
self.table = self.table[np.where(cond1 | cond2)[0]]
self.coords = SkyCoord(ra=self.table[self.ra],
dec=self.table[self.dec]
)
def add_galactic_coords(self, l="Gal_lon", b="Gal_lat"):
"""Add coordinate columns for Galactic latitude and longitude."""
self.table.add_column(np.asarray(self.coords.galactic.l.value)*u.deg, name=l)
self.table.add_column(np.asarray(self.coords.galactic.b.value)*u.deg, name=b)
def writeout_stats(self):
"""Print out some statistics about the current prime catalogue."""
n_sources = len(self.table)
print("N sources: {}".format(n_sources))
def writeout_prime(self, outname):
remove_cols(self.table)
self.table.write(outname, format="fits", overwrite=True)
@staticmethod
def trim_declination(table, declination_range, ra, dec):
cond1 = table["DEC"] > declination_range[0]
cond2 = table["DEC"] < declination_range[1]
table = table[np.where(cond1 & cond2)[0]]
coords = SkyCoord(ra=table[ra], dec=table[dec])
return table, coords
def remove_cols(table):
for col in REMOVE_COLS:
if col in table.columns:
table.remove_column(col)
def match_all(catalogues, sep, outname=None,
declination_range=None,
gp_range=None,
ra_key="RA",
dec_key="DEC",
major_key="Maj",
major_multiplier=1.,
dec_range2=None,
add_galactic=False):
if len(catalogues) < 2:
raise RuntimeError("You need at least two catalogues to merge!")
if outname is None:
outname = catalogues[0].rstrip(".fits") + "_merged.fits"
print(">>> Initialising prime catalogue with {}".format(catalogues[0]))
prime_catalogue = PrimeCatalogue(catalogues[0],
duplicate_sep=sep,
ra_key=ra_key,
dec_key=dec_key,
major_key=major_key
)
for i in range(1, len(catalogues)):
print(">>> Adding {} to prime catalogue".format(catalogues[i]))
prime_catalogue.merge_with_prime(catalogues[i],
declination_coords=dec_range2,
major_key=major_key,
major_multiplier=major_multiplier
)
prime_catalogue.writeout_stats()
if declination_range is not None:
PrimeCatalogue.trim_declination(prime_catalogue.table,
declination_range=declination_range,
ra=prime_catalogue.ra,
dec=prime_catalogue.dec)
if gp_range is not None:
prime_catalogue.trim_gp(gp_range)
if add_galactic: # add galactic coordinates:
prime_catalogue.add_galactic_coords()
prime_catalogue.writeout_stats()
prime_catalogue.writeout_prime(outname)
def get_args():
ps = ArgumentParser()
ps.add_argument("catalogues", nargs="*")
ps.add_argument("-s", "--separation", type=float, default=None,
help="Maximum separation to consider duplicate entries.")
ps.add_argument("-o", "--outname", default=None,
help="Output filename for the merged catalogue.")
ps.add_argument("-D", "--declination_range", nargs=2, default=None, type=float,
help="Declination range outside of which the output catalogue is trimmed.")
ps.add_argument("-D2", "--dec_range2", nargs="*", type=float, default=None)
ps.add_argument("-G", "--gp_range", nargs=2, type=float, default=None,
help="Galactic latitude within which the catalogue is clipped.")
ps.add_argument("-r", "--ra_key", default="RA", type=str)
ps.add_argument("-d", "--dec_key", default="DEC", type=str)
ps.add_argument("-M", "--major_key", default=None, type=str)
ps.add_argument("-m", "--major_multiplier", default=1., type=float)
ps.add_argument("--add_galactic", action="store_true")
args = ps.parse_args()
return args
def cli(args):
match_all(args.catalogues, args.separation, args.outname,
declination_range=args.declination_range,
gp_range=args.gp_range,
ra_key=args.ra_key,
dec_key=args.dec_key,
major_key=args.major_key,
major_multiplier=args.major_multiplier,
dec_range2=args.dec_range2,
add_galactic=args.add_galactic
)
if __name__ == "__main__":
cli(get_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment