|  | #!/usr/bin/env python | 
        
          |  | # encoding: utf-8 | 
        
          |  | """ | 
        
          |  | *Stitch together SOXS simulated FITS headers and data generated from the E2E simulator* | 
        
          |  |  | 
        
          |  | :Author: | 
        
          |  | David Young | 
        
          |  |  | 
        
          |  | :Date Created: | 
        
          |  | June  1, 2021 | 
        
          |  |  | 
        
          |  | Usage: | 
        
          |  | soxssim <arm> <type> [-t <tag>] [<exptime>] -i <simulatedDataRoot> -o <outputPath> | 
        
          |  |  | 
        
          |  | Options: | 
        
          |  | -h, --help                                            show this help message | 
        
          |  | -i <simulatedDataRoot>, --input <simulatedDataRoot>   path to the root directory of the simulated data (containing `data` and `headers` directories) | 
        
          |  | -o <outputPath>, --output <outputPath>                path to directory to output the requested frames to | 
        
          |  | -t <tag>, --tag <tag>                                 file tags (e.g. CR == cosmic rays) | 
        
          |  | """ | 
        
          |  | ################# GLOBAL IMPORTS #################### | 
        
          |  | import sys | 
        
          |  | import os | 
        
          |  | from fundamentals import tools | 
        
          |  | import pandas as pd | 
        
          |  | from tabulate import tabulate | 
        
          |  | from astropy.io import fits | 
        
          |  | from os.path import expanduser | 
        
          |  | from ccdproc import ImageFileCollection | 
        
          |  | from datetime import datetime, date, time, timedelta | 
        
          |  | from astrocalc.times import conversions | 
        
          |  |  | 
        
          |  |  | 
        
          |  | def main(arguments=None): | 
        
          |  | """ | 
        
          |  | *The main function used when ``soxssim.py`` is run as a single script from the cl* | 
        
          |  | """ | 
        
          |  |  | 
        
          |  | # SETUP THE COMMAND-LINE UTIL SETTINGS | 
        
          |  | su = tools( | 
        
          |  | arguments=arguments, | 
        
          |  | docString=__doc__, | 
        
          |  | logLevel="WARNING", | 
        
          |  | options_first=False, | 
        
          |  | projectName=False | 
        
          |  | ) | 
        
          |  | arguments, settings, log, dbConn = su.setup() | 
        
          |  |  | 
        
          |  | # UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES | 
        
          |  | # AUTOMATICALLY | 
        
          |  | a = {} | 
        
          |  | for arg, val in list(arguments.items()): | 
        
          |  | if arg[0] == "-": | 
        
          |  | varname = arg.replace("-", "") + "Flag" | 
        
          |  | else: | 
        
          |  | varname = arg.replace("<", "").replace(">", "") | 
        
          |  | a[varname] = val | 
        
          |  | if arg == "--dbConn": | 
        
          |  | dbConn = val | 
        
          |  | a["dbConn"] = val | 
        
          |  | log.debug('%s = %s' % (varname, val,)) | 
        
          |  |  | 
        
          |  | a['arm'] = a['arm'].upper() | 
        
          |  | a["type"] = a['type'].upper() | 
        
          |  | a["exptime"] = int(a['exptime']) | 
        
          |  | a["tag"] = a["tagFlag"] | 
        
          |  | if a["tag"]: | 
        
          |  | a["tag"] = f"_{a['tag']}" | 
        
          |  | else: | 
        
          |  | a["tag"] = "" | 
        
          |  |  | 
        
          |  | # FIND FITS DATA FRAMES MATCHING CRITERIA | 
        
          |  | # GENERATE A LIST OF FILE PATHS | 
        
          |  | simulatedDataRoot = a["inputFlag"] | 
        
          |  | # MAKE RELATIVE HOME PATH ABSOLUTE | 
        
          |  | home = expanduser("~") | 
        
          |  | if simulatedDataRoot[0] == "~": | 
        
          |  | simulatedDataRoot = simulatedDataRoot.replace("~", home) | 
        
          |  | if not len(simulatedDataRoot): | 
        
          |  | simulatedDataRoot = "." | 
        
          |  | simulatedDataRoot = os.path.abspath(simulatedDataRoot) | 
        
          |  |  | 
        
          |  | imageFrames = get_data_images(log=log, simulatedDataRoot=simulatedDataRoot, arm=a['arm'], ttype=a[ | 
        
          |  | 'type'], tag=a['tag'], exptime=a['exptime']) | 
        
          |  |  | 
        
          |  | print("INPUT FRAMES") | 
        
          |  | from tabulate import tabulate | 
        
          |  | print(tabulate(imageFrames, headers='keys', tablefmt='psql')) | 
        
          |  |  | 
        
          |  | # print(imageFrames) | 
        
          |  | # sys.exit(0) | 
        
          |  |  | 
        
          |  | primHdr, extHdr = get_header_template( | 
        
          |  | log=log, | 
        
          |  | simulatedDataRoot=simulatedDataRoot, | 
        
          |  | arm=a['arm'], | 
        
          |  | ttype=a['type'] | 
        
          |  | ) | 
        
          |  |  | 
        
          |  | outputPaths, outputDir = write_simulated_frames(log=log, outputDir=a[ | 
        
          |  | "outputFlag"], imageFrames=imageFrames, primHdr=primHdr, extHdr=extHdr, arm=a["arm"], ttype=a["type"], tag=a["tag"], exptime=a["exptime"]) | 
        
          |  |  | 
        
          |  | # GENERATE A LIST OF OUTPUT FITS FILE PATHS | 
        
          |  | fitsPaths = [] | 
        
          |  | for d in os.listdir(outputDir): | 
        
          |  | filepath = os.path.join(outputDir, d) | 
        
          |  | if os.path.isfile(filepath) and os.path.splitext(filepath)[1] == ".fits": | 
        
          |  | fitsPaths.append(filepath) | 
        
          |  |  | 
        
          |  | # GENERATE THE IMAGECOLLECTION | 
        
          |  | keys = ['INSTRUME', 'ESO DPR CATG', | 
        
          |  | 'ESO DPR TYPE', 'ESO DPR TECH', 'ESO TPL NAME', 'ESO SEQ ARM'] | 
        
          |  | # keys = ['imagetyp', 'object', 'filter', 'exposure'] | 
        
          |  | collection = ImageFileCollection(filenames=fitsPaths, keywords=keys) | 
        
          |  | collection.sort(['file']) | 
        
          |  |  | 
        
          |  | from tabulate import tabulate | 
        
          |  | print("OUTPUT FRAMES") | 
        
          |  | print(tabulate(collection.summary, headers='keys', tablefmt='psql')) | 
        
          |  |  | 
        
          |  | return | 
        
          |  |  | 
        
          |  |  | 
        
          |  | def get_data_images( | 
        
          |  | log, | 
        
          |  | simulatedDataRoot, | 
        
          |  | arm, | 
        
          |  | ttype, | 
        
          |  | tag, | 
        
          |  | exptime): | 
        
          |  | """*return a pandas data-frame of data images matching requested criteria* | 
        
          |  |  | 
        
          |  | **Key Arguments:** | 
        
          |  |  | 
        
          |  | - `log` -- logger | 
        
          |  | - `simulatedDataRoot` -- path to the root directory of the simulated data | 
        
          |  | - `arm` -- NIR/VIS | 
        
          |  | - `type` -- the frame type | 
        
          |  | - `tag` -- extra metatag for input frames (e.g. CR for cosmic rays) | 
        
          |  | - `exptime` -- the exposure time | 
        
          |  |  | 
        
          |  | **Return:** | 
        
          |  |  | 
        
          |  | - `imageFrames` -- pandas data-frame of images | 
        
          |  | """ | 
        
          |  | log.debug('starting the ``functionName`` function') | 
        
          |  |  | 
        
          |  | pathToDataDirectory = f"{simulatedDataRoot}/data/{arm}/{ttype}{tag}" | 
        
          |  |  | 
        
          |  | fitsNames = [] | 
        
          |  | fitsPaths = [] | 
        
          |  | types = [] | 
        
          |  | exptimes = [] | 
        
          |  | for d in os.listdir(pathToDataDirectory): | 
        
          |  | filepath = os.path.join(pathToDataDirectory, d) | 
        
          |  | if os.path.isfile(filepath) and os.path.splitext(filepath)[1] == ".fits": | 
        
          |  | fitsNames.append(d) | 
        
          |  | fitsPaths.append(filepath) | 
        
          |  | meta = d.split("_") | 
        
          |  | types.append(meta[0]) | 
        
          |  | exptimes.append(int(meta[1].replace("s", ""))) | 
        
          |  |  | 
        
          |  | # CREATE DATA FRAME FROM A DICTIONARY OF LISTS | 
        
          |  | dataFrames = pd.DataFrame({ | 
        
          |  | "filepath": fitsPaths, | 
        
          |  | "filename": fitsNames, | 
        
          |  | "type": types, | 
        
          |  | "exptime": exptimes | 
        
          |  | }) | 
        
          |  | # SORT BY COLUMN NAME | 
        
          |  | dataFrames.sort_values( | 
        
          |  | by=['type', 'exptime', 'filename'], inplace=True) | 
        
          |  |  | 
        
          |  | # NOW FILTER BY GIVEN CRITERIA | 
        
          |  | # FILTER DATA FRAME | 
        
          |  | # FIRST CREATE THE MASK | 
        
          |  | mask = (dataFrames['exptime'].isin([exptime])) | 
        
          |  | filteredFrames = dataFrames.loc[mask] | 
        
          |  | if not len(filteredFrames.index): | 
        
          |  | print(f"There are {arm} no data-frames of type '{ttype}' and exptime '{exptime}'. Here are the frames you can choose from:") | 
        
          |  | print(tabulate(dataFrames, headers='keys', tablefmt='psql')) | 
        
          |  | sys.exit(0) | 
        
          |  |  | 
        
          |  | log.debug('completed the ``functionName`` function') | 
        
          |  | return filteredFrames | 
        
          |  |  | 
        
          |  |  | 
        
          |  | def get_header_template( | 
        
          |  | log, | 
        
          |  | simulatedDataRoot, | 
        
          |  | arm, | 
        
          |  | ttype): | 
        
          |  | """*return the primary and extension header templates to use when stitching data into simulated frames* | 
        
          |  |  | 
        
          |  | **Key Arguments:** | 
        
          |  |  | 
        
          |  | - `log` -- logger | 
        
          |  | - `simulatedDataRoot` -- path to the root directory of the simulated data | 
        
          |  | - `arm` -- NIR/VIS | 
        
          |  | - `type` -- the frame type | 
        
          |  |  | 
        
          |  | *Return* | 
        
          |  |  | 
        
          |  | - ``primHdr`` -- the template primary header | 
        
          |  | - ``extHdr`` -- the template extension header | 
        
          |  | """ | 
        
          |  | log.debug('starting the ``get_header_template`` function') | 
        
          |  |  | 
        
          |  | tpl = None | 
        
          |  | if ttype == "FLAT": | 
        
          |  | ttype = "FLAT,LAMP" | 
        
          |  | tpl = "Flat Field calibration for NIR" | 
        
          |  | elif ttype == "ORDER_CENTRE": | 
        
          |  | ttype = "FLAT,LAMP" | 
        
          |  | tpl = "Lamp Flat Single Pinhole for NIR" | 
        
          |  | elif ttype == "SPH_ARC": | 
        
          |  | ttype = "STD,WAVE" | 
        
          |  | tpl = "Arcs Pinhole Spectral calibration for NIR" | 
        
          |  | elif ttype == "MPH_ARC": | 
        
          |  | ttype = "STD,WAVE" | 
        
          |  | tpl = "Arcs Pinhole Spectral calibration for NIR" | 
        
          |  | elif ttype == "STARE": | 
        
          |  | ttype = "OBJECT" | 
        
          |  | tpl = "Observation template with async exposures" | 
        
          |  |  | 
        
          |  | # NOW GET THE FITS HEADERS | 
        
          |  | pathToHeaderDirectory = f"{simulatedDataRoot}/headers" | 
        
          |  |  | 
        
          |  | # GENERATE A LIST OF FITS FILE PATHS | 
        
          |  | fitsPaths = [] | 
        
          |  | for d in os.listdir(pathToHeaderDirectory): | 
        
          |  | filepath = os.path.join(pathToHeaderDirectory, d) | 
        
          |  | if os.path.isfile(filepath) and os.path.splitext(filepath)[1] == ".fits": | 
        
          |  | fitsPaths.append(filepath) | 
        
          |  |  | 
        
          |  | # GENERATE THE IMAGECOLLECTION | 
        
          |  | keys = ['INSTRUME', 'ESO DPR CATG', | 
        
          |  | 'ESO DPR TYPE', 'ESO DPR TECH', 'ESO TPL NAME', 'ESO SEQ ARM'] | 
        
          |  | # keys = ['imagetyp', 'object', 'filter', 'exposure'] | 
        
          |  | collection = ImageFileCollection(filenames=fitsPaths, keywords=keys) | 
        
          |  |  | 
        
          |  | collection.sort(['file']) | 
        
          |  |  | 
        
          |  | matches = (collection.summary['INSTRUME'] == "SOXS") & ( | 
        
          |  | collection.summary['ESO SEQ ARM'] == arm.upper()) & (collection.summary['ESO DPR TYPE'] == ttype.upper()) | 
        
          |  | if tpl: | 
        
          |  | matches = matches & (collection.summary['ESO TPL NAME'] == tpl) | 
        
          |  | headers = collection.summary[matches] | 
        
          |  | from tabulate import tabulate | 
        
          |  | print("MATCHED HEADERS") | 
        
          |  | print(tabulate(headers, headers='keys', tablefmt='psql')) | 
        
          |  |  | 
        
          |  | if not len(headers): | 
        
          |  | print(f"There are no {arm} header-frames of type '{ttype}'. Here are the frames you can choose from:") | 
        
          |  | collection.summary.pprint_all() | 
        
          |  | sys.exit(0) | 
        
          |  |  | 
        
          |  | # OPEN TEMPLATE HEADERS | 
        
          |  | templateHeaderFile = headers[0]["file"] | 
        
          |  | with fits.open(templateHeaderFile, "readonly") as hdul: | 
        
          |  | primHdr = hdul[0].header | 
        
          |  | extHdr = hdul[1].header | 
        
          |  |  | 
        
          |  | log.debug('completed the ``get_header_template`` function') | 
        
          |  | return primHdr, extHdr | 
        
          |  |  | 
        
          |  |  | 
        
          |  | def write_simulated_frames( | 
        
          |  | log, | 
        
          |  | outputDir, | 
        
          |  | imageFrames, | 
        
          |  | primHdr, | 
        
          |  | extHdr, | 
        
          |  | arm, | 
        
          |  | ttype, | 
        
          |  | tag, | 
        
          |  | exptime): | 
        
          |  | """*summary of function* | 
        
          |  |  | 
        
          |  | **Key Arguments:** | 
        
          |  |  | 
        
          |  | - `log` -- logger | 
        
          |  | - `outputDir` -- path to directory to output the requested frames to | 
        
          |  | - `imageFrames` -- imagecollection of the image frames | 
        
          |  | - `primHdr` -- template primary header | 
        
          |  | - `extHdr` -- template extension header | 
        
          |  | - `arm` -- NIR/VIS | 
        
          |  | - `type` -- the frame type | 
        
          |  | - `tag` -- extra metatag for input frames (e.g. CR for cosmic rays) | 
        
          |  | - `exptime` -- the exposure time | 
        
          |  |  | 
        
          |  | **Return** | 
        
          |  |  | 
        
          |  | - ``outputPaths`` -- list of paths to the output frames | 
        
          |  |  | 
        
          |  | """ | 
        
          |  | log.debug('starting the ``functionName`` function') | 
        
          |  |  | 
        
          |  | # FORMAT OUTPUT PATH | 
        
          |  | home = expanduser("~") | 
        
          |  | outputDir = outputDir.replace("~", home) + f'/{ttype}{tag}' | 
        
          |  | # RECURSIVELY CREATE MISSING DIRECTORIES | 
        
          |  | if not os.path.exists(outputDir): | 
        
          |  | os.makedirs(outputDir) | 
        
          |  |  | 
        
          |  | fh_type = ttype | 
        
          |  |  | 
        
          |  | if fh_type == "FLAT": | 
        
          |  | fh_type = "LAMP,FLAT" | 
        
          |  | elif fh_type == "ORDER_CENTRE": | 
        
          |  | fh_type = "LAMP,ORDERDEF" | 
        
          |  | elif fh_type == 'SPH_ARC': | 
        
          |  | fh_type = "LAMP,FMTCHK" | 
        
          |  | elif fh_type == 'MPH_ARC': | 
        
          |  | fh_type = "LAMP,WAVE" | 
        
          |  | elif fh_type == 'STARE': | 
        
          |  | fh_type = "OBJECT" | 
        
          |  |  | 
        
          |  | # ASTROCALC CONVERTER | 
        
          |  | converter = conversions( | 
        
          |  | log=log | 
        
          |  | ) | 
        
          |  | outputPaths = [] | 
        
          |  | numberFrames = len(imageFrames) | 
        
          |  | for i, frame in enumerate(imageFrames['filepath']): | 
        
          |  | now = datetime.now() + timedelta(seconds=float(exptime) * i) | 
        
          |  | nowStr = now.strftime("%Y-%m-%dT%H:%M:%S.%f") | 
        
          |  |  | 
        
          |  | thisHrd = primHdr.copy() | 
        
          |  |  | 
        
          |  | # CREATE DICTIONARY OF HEADER KEYWORDS TO UPDATE | 
        
          |  | kw = { | 
        
          |  | "DATE": nowStr, | 
        
          |  | "EXPTIME": float(exptime), | 
        
          |  | "DATE-OBS": nowStr, | 
        
          |  | "MJD-OBS": (converter.ut_datetime_to_mjd(utDatetime=nowStr), nowStr), | 
        
          |  | "ESO DET SEQ1 DIT": float(exptime), | 
        
          |  | "ESO DET SEQ1 EXPTIME": float(exptime), | 
        
          |  | "ESO TPL NEXP": numberFrames, | 
        
          |  | "ESO TPL EXPNO": i + 1, | 
        
          |  | "ESO DPR TYPE": fh_type | 
        
          |  | } | 
        
          |  |  | 
        
          |  | onofftag = "" | 
        
          |  | if arm == "NIR": | 
        
          |  | basename = os.path.basename(frame) | 
        
          |  | if "_ON" in basename: | 
        
          |  | if ttype == "FLAT": | 
        
          |  | kw["ESO DPR TECH"] = "ECHELLE,SLIT" | 
        
          |  | elif ttype in ["ORDER_CENTRE", "SPH_ARC"]: | 
        
          |  | kw["ESO DPR TECH"] = "ECHELLE,PINHOLE" | 
        
          |  | elif ttype in ["MPH_ARC"]: | 
        
          |  | kw["ESO DPR TECH"] = "ECHELLE,MULTI-PINHOLE" | 
        
          |  | elif ttype in ["STARE"]: | 
        
          |  | kw["ESO DPR TECH"] = "ECHELLE,SLIT,STARE" | 
        
          |  | onofftag = "_ON" | 
        
          |  | elif "_OFF" in basename: | 
        
          |  | kw["ESO DPR TECH"] = "IMAGE" | 
        
          |  | kw["ESO DPR CATG"] = "CALIB" | 
        
          |  | onofftag = "_OFF" | 
        
          |  |  | 
        
          |  | # UPDATE PRIMARY HEADER | 
        
          |  | for k, v in kw.items(): | 
        
          |  | thisHrd[k] = v | 
        
          |  |  | 
        
          |  | # CREATE DICTIONARY OF HEADER KEYWORDS TO UPDATE | 
        
          |  | kw = { | 
        
          |  | "EXPTIME": float(exptime), | 
        
          |  | "ESO DET EXP UTC": nowStr, | 
        
          |  | "ESO DET FRAM UTC": nowStr, | 
        
          |  | "ESO DET SEQ UTC": nowStr | 
        
          |  | } | 
        
          |  | # UPDATE EXTENSION HEADER | 
        
          |  | for k, v in kw.items(): | 
        
          |  | extHdr[k] = v | 
        
          |  |  | 
        
          |  | with fits.open(frame, "readonly") as hdul: | 
        
          |  | data = hdul[0].data | 
        
          |  |  | 
        
          |  | # CREATE THE FITS FRAME | 
        
          |  | primHDU = fits.PrimaryHDU(header=thisHrd) | 
        
          |  | # primHDU.header = primHdr | 
        
          |  | imageHDU = fits.ImageHDU(header=extHdr, data=data) | 
        
          |  |  | 
        
          |  | hdul = fits.HDUList([primHDU, imageHDU]) | 
        
          |  | outPath = f"{outputDir}/SOXS_{arm}_{ttype}{onofftag}_{exptime}_{i+1:03d}.fits" | 
        
          |  | hdul.writeto(outPath, output_verify="fix+warn", | 
        
          |  | overwrite=True, checksum=False) | 
        
          |  | outputPaths.append(outPath) | 
        
          |  |  | 
        
          |  | log.debug('completed the ``functionName`` function') | 
        
          |  | return outputPaths, outputDir | 
        
          |  |  | 
        
          |  |  | 
        
          |  | if __name__ == '__main__': | 
        
          |  | main() |