Last active
June 14, 2022 07:27
-
-
Save Sunmish/6debab18866de83480b0de7ff542cbfb to your computer and use it in GitHub Desktop.
Merge source-lists to create a catalogue.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
from argparse import ArgumentParser | |
import os | |
import numpy as np | |
from astropy.io import fits | |
from astropy.table import Table, vstack | |
from astropy.coordinates import SkyCoord | |
from astropy import units as u | |
REMOVE_COLS = [ | |
"Source_id", | |
"Isl_id", | |
"Gaus_id", | |
"Wave_id", | |
"Xposn", | |
"E_Xposn", | |
"Yposn", | |
"E_Yposn", | |
"Maj_img_plane", | |
"E_Maj_img_plane", | |
"Min_img_plane", | |
"E_min_img_plane", | |
"PA_img_plane", | |
"E_PA_img_plane", | |
"DC_PA_img_plane", | |
"E_DC_PA_img_plane", | |
"DC_Maj_img_plane", | |
"E_DC_Maj_img_plane", | |
"DC_Min_img_plane", | |
"E_DC_Min_img_plane", | |
"Isl_mean", | |
"Residual_Isl_mean" | |
] | |
class PrimeCatalogue(): | |
def __init__(self, catalogue, | |
duplicate_sep=None, | |
ra_key="RA", | |
dec_key="DEC", | |
major_key="Maj"): | |
self.table = Table.read(catalogue) | |
self.ra = ra_key | |
self.dec = dec_key | |
self.coords = SkyCoord(ra=self.table[self.ra], | |
dec=self.table[self.dec] | |
) | |
self.duplicate_sep = duplicate_sep | |
self.duplicate = np.full((len(self.table),), False, dtype=bool) | |
def merge_with_prime(self, catalogue, declination_coords=None, | |
major_key=None, major_multiplier=1.): | |
"""Merge secondary catalogue with primary catalogue. | |
Matching is to ensure duplicates are removed (from the secondary) and | |
the remainder of the table is appended to the primary catalogue. | |
""" | |
table = Table.read(catalogue) | |
coords = SkyCoord(ra=table[self.ra], | |
dec=table[self.dec] | |
) | |
if declination_coords is not None: | |
table, coords = PrimeCatalogue.trim_declination( | |
table=table, | |
declination_range=declination_coords, | |
ra=self.ra, | |
dec=self.dec | |
) | |
idx, seps, _ = coords.match_to_catalog_sky(self.coords) | |
nidx, nseps, _ = coords.match_to_catalog_sky(self.coords, nthneighbor=2) | |
if major_key is not None: | |
duplicate_sep1 = table[major_key] / 2. / major_multiplier | |
duplicate_sep2 = self.table[major_key] / 2. / major_multiplier | |
cond1 = seps.value < duplicate_sep1 | |
cond2 = seps.value < duplicate_sep2[idx] | |
temp_idx1 = np.where(cond1 & cond2)[0] | |
temp_idx2 = idx[temp_idx1] | |
temp_idx = np.array([table["Tile_centre_sep"][temp_idx1], | |
self.table["Tile_centre_sep"][temp_idx2]]).argmin(axis=0) | |
bool1 = np.full_like(idx, False, dtype=bool) | |
bool2 = np.full_like(self.coords.ra.value, False, dtype=bool) | |
include_idx1 = temp_idx1[np.where(temp_idx == 1)[0]] | |
include_idx2 = temp_idx2[np.where(temp_idx == 0)[0]] | |
bool1[include_idx1] = True | |
bool2[include_idx2] = True | |
self.table = vstack([self.table[~bool2], table[~bool1]]) | |
elif self.duplicate_sep is not None: | |
duplicate_sep = self.duplicate_sep | |
temp_idx1 = np.where(seps.value < duplicate_sep)[0] | |
temp_idx2 = idx[temp_idx1] | |
temp_idx = np.array([table["Tile_centre_sep"][temp_idx1], | |
self.table["Tile_centre_sep"][temp_idx2]]).argmin(axis=0) | |
bool1 = np.full_like(idx, False, dtype=bool) | |
bool2 = np.full_like(self.coords.ra.value, False, dtype=bool) | |
include_idx1 = temp_idx1[np.where(temp_idx == 1)[0]] | |
include_idx2 = temp_idx2[np.where(temp_idx == 0)[0]] | |
bool1[include_idx1] = True | |
bool2[include_idx2] = True | |
self.table = vstack([self.table[~bool2], table[~bool1]]) | |
# self.duplicate = vstack([self.duplicate[~bool2], | |
# np.full((len(table[~bool1]),), False, dtype=bool) | |
# ]) | |
else: | |
self.table = vstack([self.table, table]) | |
self.coords = SkyCoord(ra=self.table[self.ra], | |
dec=self.table[self.dec] | |
) | |
def _trim_declination(self, declination_range): | |
"""Trim catalogue based on max and minimum declination.""" | |
self.table, self.coords = PrimeCatalogue.trim_declination( | |
table=self.table, | |
declination_range=declination_range, | |
ra=self.ra, | |
dec=self.dec | |
) | |
def trim_gp(self, gp_range): | |
"""Trim catalogue based on Galactic latitude.""" | |
cond1 = self.coords.galactic.b.value < gp_range[0] | |
cond2 = self.coords.galactic.b.value > gp_range[1] | |
self.table = self.table[np.where(cond1 | cond2)[0]] | |
self.coords = SkyCoord(ra=self.table[self.ra], | |
dec=self.table[self.dec] | |
) | |
def add_galactic_coords(self, l="Gal_lon", b="Gal_lat"): | |
"""Add coordinate columns for Galactic latitude and longitude.""" | |
self.table.add_column(np.asarray(self.coords.galactic.l.value)*u.deg, name=l) | |
self.table.add_column(np.asarray(self.coords.galactic.b.value)*u.deg, name=b) | |
def writeout_stats(self): | |
"""Print out some statistics about the current prime catalogue.""" | |
n_sources = len(self.table) | |
print("N sources: {}".format(n_sources)) | |
def writeout_prime(self, outname): | |
remove_cols(self.table) | |
self.table.write(outname, format="fits", overwrite=True) | |
@staticmethod | |
def trim_declination(table, declination_range, ra, dec): | |
cond1 = table["DEC"] > declination_range[0] | |
cond2 = table["DEC"] < declination_range[1] | |
table = table[np.where(cond1 & cond2)[0]] | |
coords = SkyCoord(ra=table[ra], dec=table[dec]) | |
return table, coords | |
def remove_cols(table): | |
for col in REMOVE_COLS: | |
if col in table.columns: | |
table.remove_column(col) | |
def match_all(catalogues, sep, outname=None, | |
declination_range=None, | |
gp_range=None, | |
ra_key="RA", | |
dec_key="DEC", | |
major_key="Maj", | |
major_multiplier=1., | |
dec_range2=None, | |
add_galactic=False): | |
if len(catalogues) < 2: | |
raise RuntimeError("You need at least two catalogues to merge!") | |
if outname is None: | |
outname = catalogues[0].rstrip(".fits") + "_merged.fits" | |
print(">>> Initialising prime catalogue with {}".format(catalogues[0])) | |
prime_catalogue = PrimeCatalogue(catalogues[0], | |
duplicate_sep=sep, | |
ra_key=ra_key, | |
dec_key=dec_key, | |
major_key=major_key | |
) | |
for i in range(1, len(catalogues)): | |
print(">>> Adding {} to prime catalogue".format(catalogues[i])) | |
prime_catalogue.merge_with_prime(catalogues[i], | |
declination_coords=dec_range2, | |
major_key=major_key, | |
major_multiplier=major_multiplier | |
) | |
prime_catalogue.writeout_stats() | |
if declination_range is not None: | |
PrimeCatalogue.trim_declination(prime_catalogue.table, | |
declination_range=declination_range, | |
ra=prime_catalogue.ra, | |
dec=prime_catalogue.dec) | |
if gp_range is not None: | |
prime_catalogue.trim_gp(gp_range) | |
if add_galactic: # add galactic coordinates: | |
prime_catalogue.add_galactic_coords() | |
prime_catalogue.writeout_stats() | |
prime_catalogue.writeout_prime(outname) | |
def get_args(): | |
ps = ArgumentParser() | |
ps.add_argument("catalogues", nargs="*") | |
ps.add_argument("-s", "--separation", type=float, default=None, | |
help="Maximum separation to consider duplicate entries.") | |
ps.add_argument("-o", "--outname", default=None, | |
help="Output filename for the merged catalogue.") | |
ps.add_argument("-D", "--declination_range", nargs=2, default=None, type=float, | |
help="Declination range outside of which the output catalogue is trimmed.") | |
ps.add_argument("-D2", "--dec_range2", nargs="*", type=float, default=None) | |
ps.add_argument("-G", "--gp_range", nargs=2, type=float, default=None, | |
help="Galactic latitude within which the catalogue is clipped.") | |
ps.add_argument("-r", "--ra_key", default="RA", type=str) | |
ps.add_argument("-d", "--dec_key", default="DEC", type=str) | |
ps.add_argument("-M", "--major_key", default=None, type=str) | |
ps.add_argument("-m", "--major_multiplier", default=1., type=float) | |
ps.add_argument("--add_galactic", action="store_true") | |
args = ps.parse_args() | |
return args | |
def cli(args): | |
match_all(args.catalogues, args.separation, args.outname, | |
declination_range=args.declination_range, | |
gp_range=args.gp_range, | |
ra_key=args.ra_key, | |
dec_key=args.dec_key, | |
major_key=args.major_key, | |
major_multiplier=args.major_multiplier, | |
dec_range2=args.dec_range2, | |
add_galactic=args.add_galactic | |
) | |
if __name__ == "__main__": | |
cli(get_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment