Last active
May 8, 2024 09:12
-
-
Save mileslucas/3fc6529463b4454bf001efdb9624b321 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import OrderedDict | |
import multiprocessing | |
from pathlib import Path | |
import shutil | |
from typing import Union, Optional | |
from astropy.io import fits | |
import click | |
import pandas | |
from tqdm.auto import tqdm | |
## Functionality | |
def dict_from_header_file(filename: Union[str, Path], **kwargs) -> OrderedDict: | |
"""Parse a FITS header from a file and extract the keys and values as an ordered dictionary. Multi-line keys like ``COMMENTS`` and ``HISTORY`` will be combined with commas. The resolved path will be inserted with the ``path`` key. | |
Parameters | |
---------- | |
filename : str | |
FITS file to parse | |
**kwargs | |
All keyword arguments will be passed to ``load_fits_header`` | |
Returns | |
------- | |
OrderedDict | |
""" | |
path = Path(filename) | |
summary = OrderedDict() | |
# add path to row before the FITS header keys | |
summary["path"] = str(path.resolve().absolute()) | |
header = fits.getheader(filename) | |
summary.update(dict_from_header(header, **kwargs)) | |
return summary | |
def dict_from_header(header: fits.Header, excluded=("COMMENT", "HISTORY")) -> OrderedDict: | |
"""Parse a FITS header and extract the keys and values as an ordered dictionary. Multi-line keys like ``COMMENTS`` and ``HISTORY`` will be combined with commas. The resolved path will be inserted with the ``path`` key. | |
Parameters | |
---------- | |
header : Header | |
FITS header to parse | |
Returns | |
------- | |
OrderedDict | |
""" | |
summary = OrderedDict() | |
for k, v in header.items(): | |
if k == "" or k in excluded: | |
continue | |
summary[k] = v | |
return summary | |
def header_table( | |
filenames, num_proc: Optional[int] = None, quiet: bool = False, **kwargs | |
) -> pandas.DataFrame: | |
"""Generate a pandas dataframe from the FITS headers parsed from the given files. | |
Parameters | |
---------- | |
filenames : list[] | |
num_proc : int, optional | |
Number of processes to use in multiprocessing, by default multiprocessing.cpu_count() | |
quiet : bool, optional | |
Silence the progress bar, by default False | |
Returns | |
------- | |
pandas.DataFrame | |
""" | |
if num_proc is None: | |
num_proc = min(8, multiprocessing.cpu_count()) | |
with multiprocessing.Pool(num_proc) as pool: | |
jobs = [pool.apply_async(dict_from_header_file, args=(f,), kwds=kwargs) for f in filenames] | |
iter = jobs if quiet else tqdm(jobs, desc="Parsing FITS headers") | |
rows = [job.get() for job in iter] | |
return pandas.DataFrame(rows) | |
# set up commands for parser to dispatch to | |
def sort_files( | |
filenames, | |
copy: bool = False, | |
output_directory: Optional[Path] = None, | |
num_proc: int = min(8, multiprocessing.cpu_count()), | |
quiet: bool = False, | |
decompress: bool = False, | |
**kwargs, | |
): | |
outdir = Path(output_directory) if output_directory is not None else Path(filenames[0]).parent | |
jobs = [] | |
with multiprocessing.Pool(num_proc) as pool: | |
for filename in filenames: | |
kwds = dict(outdir=outdir, copy=copy, decompress=decompress, **kwargs) | |
jobs.append(pool.apply_async(sort_file, args=(filename,), kwds=kwds)) | |
iter = jobs if quiet else tqdm(jobs, desc="Sorting files") | |
results = [job.get() for job in iter] | |
return results | |
def sort_file( | |
filename, outdir, copy: bool = False, decompress: bool = False, **kwargs | |
) -> Path: | |
path = Path(filename) | |
header = fits.getheader(path, **kwargs) | |
foldname = charis_foldername(outdir, header) | |
newname = foldname / path.name | |
foldname.mkdir(parents=True, exist_ok=True) | |
if decompress: | |
newname = foldname / path.name.replace(".fits.fz", ".fits") | |
if not newname.exists(): | |
with fits.open(path) as hdul: | |
fits.writeto(newname, hdul[1].data, header=hdul[1].header) | |
elif copy: | |
shutil.copy(path, newname) | |
else: | |
path.replace(newname) | |
return newname | |
def charis_foldername(outdir: Path, header: fits.Header): | |
filt_str = header["Y_FLTNAM"] | |
exptime = header["EXPTIME"] # s | |
prism = header['Y_PRISM'] | |
dtype = header["DATA-TYP"] | |
if dtype == "OBJECT": | |
# subsort based on filter, EM gain, and exposure time | |
subdir = f"{filt_str}_{prism}_{exptime:06.02f}s" | |
foldname = outdir / header["OBJECT"].replace(" ", "_") / subdir | |
elif dtype == "DARK": | |
subdir = f"{filt_str}_{prism}_{exptime:06.02f}s" | |
foldname = outdir / "darks" / subdir | |
elif dtype == "SKYFLAT": | |
subdir = f"{filt_str}_{prism}_{exptime:06.02f}s" | |
foldname = outdir / "skies" / subdir | |
elif dtype in ("FLAT", "DOMEFLAT"): | |
subdir = f"{filt_str}_{prism}_{exptime:06.02f}s" | |
foldname = outdir / "flats" / subdir | |
elif dtype == "COMPARISON": | |
if header["OBJECT"].endswith("nm"): | |
subdir = f"{filt_str}_{prism}_{header['OBJECT']}_{exptime:06.02f}s" | |
foldname = outdir / "wavecal" / subdir | |
else: | |
subdir = f"{filt_str}_{prism}_{exptime:06.02f}s" | |
foldname = outdir / "pinholes" / subdir | |
else: | |
foldname = outdir / "unsorted" | |
return foldname | |
########## CLI ########## | |
@click.command( | |
name="charis_sort", | |
short_help="Sort raw data", | |
help="Sorts raw data based on the data type.", | |
) | |
@click.argument( | |
"filenames", nargs=-1, type=click.Path(dir_okay=False, readable=True, path_type=Path) | |
) | |
@click.option( | |
"--outdir", | |
"-o", | |
type=click.Path(file_okay=False, writable=True, path_type=Path), | |
default=Path.cwd(), | |
help="Output directory.", | |
) | |
@click.option("--ext", "-e", default=0, help="HDU extension") | |
@click.option( | |
"--copy/--no-copy", | |
"-c/-nc", | |
default=True, | |
prompt="Would you like to copy files?", | |
help="copy files instead of moving them", | |
) | |
@click.option( | |
"-d", | |
"--decompress", | |
is_flag=True, | |
prompt=True, | |
help="Would you like to decompress .fits.gz files while copying?", | |
) | |
@click.option("--quiet", "-q", is_flag=True, help="Silence progress bars and extraneous logging.") | |
def sort_raw(filenames, outdir, num_proc=1, ext=0, copy=False, quiet=False, decompress=False): | |
sort_files( | |
filenames, | |
copy=copy, | |
ext=ext, | |
output_directory=outdir, | |
num_proc=num_proc, | |
quiet=quiet, | |
decompress=decompress, | |
) | |
if __name__ == "__main__": | |
sort_raw() |
Usage:
python charis_sort.py CRSA*.fits
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Requires following: