Last active
April 22, 2021 06:15
-
-
Save KaoruNishikawa/79cb93b307f526dae3aaba9290873f4e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [tool.poetry] | |
| name = "test" | |
| version = "0.1.0" | |
| description = "" | |
| [tool.poetry.dependencies] | |
| python = "^3.7" | |
| matplotlib = "^3.3.2" | |
| astropy = "^4.1" | |
| numpy = "^1.19.4" | |
| xarray = "^0.16.1" | |
| necstdb = "^0.2.3" | |
| n_const = "^0.1.0" | |
| [tool.poetry.dev-dependencies] | |
| pytest = "^5.2" | |
| black = "^20.8b1" | |
| flake8 = "^3.8.4" | |
| jupyter = "^1.0.0" | |
| jupyterlab = "^2.2.9" | |
| sphinx = "^3.3.1" | |
| sphinx-rtd-theme = "^0.5.0" | |
| m2r2 = "^0.2.7" | |
| ipykernel = "^5.3.4" | |
| [build-system] | |
| requires = ["poetry-core>=1.0.0"] | |
| build-backend = "poetry.core.masonry.api" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pickle | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Union, Optional, Tuple | |
| import necstdb | |
| import numpy as np | |
| import xarray as xr | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| import matplotlib.animation as animation | |
| from matplotlib.gridspec import GridSpec | |
| from astropy.coordinates import SkyCoord | |
| from astropy.coordinates import AltAz, FK5, Galactic | |
| import n_const.constants as n2const | |
| from nasco_analysis.kisa_rev import apply_kisa_test | |
| PathLike = Union[str, Path] | |
| timestamp2datetime = np.vectorize(datetime.utcfromtimestamp) | |
| class ScanCheck: | |
| """ | |
| Tool to check whether the data is taken with "normally" controlled | |
| system. | |
| """ | |
| ENCODER_TOPIC = "status_encoder" | |
| OBSMODE_TOPIC = "obsmode" | |
| def __init__(self, data_path: PathLike, kisa_path: PathLike = None) -> None: | |
| self.data_path = Path(data_path) | |
| self.kisa_path = Path(kisa_path) | |
| self.db = necstdb.opendb(self.data_path) | |
| self.encoder_data = self.load_data(self.ENCODER_TOPIC) | |
| self.obsmode_data = self.load_data(self.OBSMODE_TOPIC) | |
| self.target = str(self.data_path).split("_")[-1] | |
| def load_data(self, topic_name: str) -> dict: | |
| data = self.db.open_table(topic_name).read(astype="array") | |
| return {field: data[field] for field in data.dtype.names} | |
| def create_data_array(self, dump: bool = False) -> xr.Dataset: | |
| encoder_array = ( | |
| xr.Dataset({k: xr.DataArray(v) for k, v in self.encoder_data.items()}) | |
| .rename({"dim_0": "t", "timestamp": "t"}) | |
| .set_coords("t") | |
| .swap_dims({"t": "t"}) | |
| ) | |
| coords = self.trans_coords(encoder_array) | |
| obsmode_array = ( | |
| xr.Dataset({k: xr.DataArray(v) for k, v in self.obsmode_data.items()}) | |
| .rename({"dim_0": "t", "received_time": "t"}) | |
| .set_coords("t") | |
| .swap_dims({"t": "t"}) | |
| ) | |
| drive_data = encoder_array.assign( | |
| obsmode_array.reindex_like(encoder_array, "nearest") | |
| ).assign_coords(coords) | |
| if dump is True: | |
| pickle_dir = "./" | |
| if dump: | |
| pickle_path = Path(pickle_dir) / Path(self.data_path.stem).with_suffix( | |
| ".pickle" | |
| ) | |
| with open(pickle_path, "wb") as f: | |
| pickle.dump(drive_data, f) | |
| return pickle_path.absolute() | |
| return drive_data | |
| def trans_coords(self, enc: xr.Dataset) -> dict: | |
| """ | |
| Transform azel coordinate to equatorial and galactic coordinates. | |
| Notes | |
| ----- | |
| This function will take 100s to process data of length 300k. | |
| """ | |
| print( | |
| "Calculating coordinates, " | |
| f"will take about {enc.sizes.get('t') / 3000:.0f} s" | |
| ) | |
| # empirical, about 3000 data are processed per second | |
| _az = enc.enc_az / 3600 | |
| _el = enc.enc_el / 3600 | |
| if self.kisa_path: | |
| d_az, d_el = apply_kisa_test(azel=(_az, _el), hosei=self.kisa_path) | |
| else: | |
| d_az, d_el = 0, 0 | |
| az = _az + d_az / 3600 | |
| el = _el + d_el / 3600 | |
| coord_horizontal = SkyCoord( | |
| az=az.data, | |
| alt=el.data, | |
| frame=AltAz, | |
| location=n2const.LOC_NANTEN2, | |
| obstime=timestamp2datetime(enc.t), | |
| unit="deg", | |
| ) | |
| # If you give the coordinate as xr.DataArray, it may take 100 times longer | |
| # to get astropy object. | |
| coord_equatorial = coord_horizontal.transform_to( | |
| FK5 | |
| ) # this transformation take quite long time | |
| coord_galactic = coord_equatorial.transform_to(Galactic) | |
| ret = { | |
| "az": ("t", az), | |
| "el": ("t", el), | |
| "ra": ("t", coord_equatorial.ra), | |
| "dec": ("t", coord_equatorial.dec), | |
| "l": ("t", coord_galactic.l), | |
| "b": ("t", coord_galactic.b), | |
| } | |
| return ret | |
| class VisualizeScan: | |
| MAIN_OBSMODES = [b"ON ", b"OFF ", b"SKY "] | |
| OTHER_OBSMODES = [b" ", b"Non ", b"HOT "] | |
| PLOT_COLOR = { | |
| b"HOT ": "#F00", | |
| b"Non ": "#000", | |
| b"OFF ": "#0DF", | |
| b"SKY ": "#0DF", | |
| b"ON ": "#0F0", | |
| b" ": "#777", | |
| } | |
| COORD_MAP = { | |
| "horizontal": { | |
| "xy": ["az", "el"], | |
| "title": "Horizontal", | |
| "label": ["Az. [deg]", "El. [deg]"], | |
| }, | |
| "equatorial": { | |
| "xy": ["ra", "dec"], | |
| "title": "Equatorial (J2000)", | |
| "label": ["R.A. [deg]", "Dec. [deg]"], | |
| }, | |
| "galactic": { | |
| "xy": ["l", "b"], | |
| "title": "Galactic", | |
| "label": ["$l$ [deg]", "$b$ [deg]"], | |
| }, | |
| } | |
| def __init__( | |
| self, drive_data: xr.Dataset, observation: str = "NotSpecified" | |
| ) -> None: | |
| self.drive_data = drive_data | |
| self.observation = observation | |
| self.target = observation.split("_")[-1] | |
| @classmethod | |
| def from_pickle(cls, pickle_path: PathLike) -> None: | |
| observation = str(pickle_path).split(".")[0] | |
| with open(pickle_path, "rb") as p: | |
| return cls(pickle.load(p), observation) | |
| @classmethod | |
| def draw_data_centric(cls, ax: matplotlib.axes._axes.Axes) -> None: | |
| return NotImplemented | |
| def draw_one_coord(self, coord: str, ax: matplotlib.axes._axes.Axes = None) -> None: | |
| """ | |
| Parameters | |
| ---------- | |
| coord: str | |
| Either of ["horizontal", "equatorial", "galactic"]. | |
| ax: axes object of matplotlib | |
| Axis to which the data is drawn. | |
| """ | |
| if ax is None: | |
| fig, ax = plt.subplots(1, 1) | |
| coord = coord.lower() | |
| existing_obsmodes = np.unique(self.drive_data.obs_mode) | |
| # get common obsmodes | |
| main_obsmodes = set(self.MAIN_OBSMODES).intersection(existing_obsmodes) | |
| other_obsmodes = set(self.OTHER_OBSMODES).intersection(existing_obsmodes) | |
| x, y = self.COORD_MAP[coord]["xy"] | |
| def draw(zorder): | |
| mod = mode.decode("utf8") | |
| settings = { | |
| "s": 0.1, | |
| "zorder": zorder, | |
| "label": mod, | |
| "c": self.PLOT_COLOR[mode], | |
| } | |
| drive_dat = self.drive_data.where( | |
| self.drive_data.obs_mode == mode, drop=True | |
| ) | |
| ax.scatter(drive_dat[x], drive_dat[y], **settings) | |
| for mode in main_obsmodes: | |
| draw(zorder=-1) | |
| self.xlim, self.ylim = ax.get_xlim(), ax.get_ylim() | |
| if coord != "horizontal": | |
| self.xlim = self.xlim[::-1] | |
| for mode in other_obsmodes: | |
| draw(zorder=-2) | |
| ax.set( | |
| title=self.COORD_MAP[coord]["title"], | |
| xlim=self.xlim, | |
| ylim=self.ylim, | |
| xlabel=self.COORD_MAP[coord]["label"][0], | |
| ylabel=self.COORD_MAP[coord]["label"][1], | |
| ) | |
| ax.set_rasterization_zorder(0) | |
| ax.legend() | |
| ax.grid() | |
| return | |
| def draw_figure(self, save: Union[PathLike, bool] = False) -> Optional[Path]: | |
| print("Drawing a figure...") | |
| fig, axes = plt.subplots(3, 1, figsize=(14, 10)) | |
| self.draw_one_coord("horizontal", axes[0]) | |
| self.draw_one_coord("equatorial", axes[1]) | |
| self.draw_one_coord("galactic", axes[2]) | |
| plt.tight_layout() | |
| if save is True: | |
| save_dir = "./" | |
| if save: | |
| fig_path = Path(save_dir) / f"{self.target}_observation.pdf" | |
| plt.savefig(fig_path, dpi=150) | |
| return fig_path.absolute() | |
| def create_movie( | |
| self, | |
| coord: str = "galactic", | |
| save_dir: Union[PathLike, str] = None, | |
| bunch: int = 10, | |
| vrange: Tuple[float] = (-2000, 2000), | |
| plot_t_width: float = 15, | |
| ) -> Path: | |
| """ | |
| Create a movie of scan track. | |
| Notes | |
| ----- | |
| To run this function needs FFMpeg installed. If you don't have it, | |
| you can install by `brew install ffmpeg`. | |
| """ | |
| print("Creating a movie...") | |
| self.draw_one_coord(coord) | |
| data_frequency = self.drive_data.t.diff("t").mean() | |
| fig = plt.figure(figsize=(7, 7)) | |
| gs = GridSpec(nrows=2, ncols=1, height_ratios=[5, 3]) | |
| axes = { | |
| field: fig.add_subplot(spec) for field, spec in zip(["scan", "speed"], gs) | |
| } | |
| _x, _y = self.COORD_MAP[coord]["xy"] | |
| x = self.drive_data[_x].data | |
| y = self.drive_data[_y].data | |
| v_az = self.drive_data.az.differentiate("t").data * 3600 # arcsec | |
| v_el = self.drive_data.el.differentiate("t").data * 3600 # arcsec | |
| t = self.drive_data.timestamp.data | |
| obsmode = self.drive_data.obs_mode.data | |
| (az_speed,) = axes["speed"].plot([], [], "bo", ms=0.5) | |
| (el_speed,) = axes["speed"].plot([], [], "ro", ms=0.5) | |
| def init(): | |
| [ax.grid(True) for ax in axes.values()] | |
| axes["scan"].set( | |
| xlabel=self.COORD_MAP[coord]["label"][0], | |
| ylabel=self.COORD_MAP[coord]["label"][1], | |
| xlim=self.xlim, | |
| ylim=self.ylim, | |
| ) | |
| axes["speed"].set(xlabel="t", ylabel="v [arcsec/s]") | |
| axes["speed"].text( | |
| 0.99, | |
| 0.99, | |
| "BLUE: Az, RED: El", | |
| transform=axes["speed"].transAxes, | |
| ha="right", | |
| va="top", | |
| ) | |
| az_speed.set_data([], []) | |
| el_speed.set_data([], []) | |
| return (az_speed, el_speed) | |
| def update(i): | |
| j = int(max(0, i * bunch - 2 * plot_t_width // data_frequency)) | |
| x_ = x[i * bunch : (i + 1) * bunch] | |
| y_ = y[i * bunch : (i + 1) * bunch] | |
| v_az_ = v_az[j : (i + 1) * bunch].clip(*vrange) # arcsec | |
| v_el_ = v_el[j : (i + 1) * bunch].clip(*vrange) # arcsec | |
| t_ = t[j : (i + 1) * bunch] | |
| obsmode_ = obsmode[i * bunch : (i + 1) * bunch] | |
| settings = { | |
| "s": 0.05, | |
| "zorder": -1, | |
| "c": [self.PLOT_COLOR[mode] for mode in obsmode_], | |
| } | |
| axes["scan"].scatter(x_, y_, **settings) | |
| axes["scan"].set_rasterization_zorder(0) | |
| az_speed.set_data(t_, v_az_) | |
| el_speed.set_data(t_, v_el_) | |
| axes["speed"].set( | |
| xlim=(t_[-1] - np.timedelta64(plot_t_width, "s"), t_[-1]), ylim=vrange | |
| ) | |
| axes["speed"].tick_params(axis="x", labelrotation=40) | |
| return (az_speed, el_speed) | |
| movie = animation.FuncAnimation( | |
| fig, | |
| update, | |
| init_func=init, | |
| frames=range((self.drive_data.sizes.get("t") - 1) // bunch), | |
| # (length of diff array) // bunch | |
| blit=True, | |
| ) | |
| if save_dir is None: | |
| save_dir = "./" | |
| save_path = (Path(save_dir) / Path(self.target).stem).with_suffix(".mp4") | |
| ffmpeg_writer = animation.FFMpegWriter(fps=6, codec="h264") | |
| movie.save(str(save_path.absolute()), writer=ffmpeg_writer, dpi=150) | |
| return save_path.absolute() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment