Skip to content

Instantly share code, notes, and snippets.

@KaoruNishikawa
Last active April 22, 2021 06:15
Show Gist options
  • Select an option

  • Save KaoruNishikawa/79cb93b307f526dae3aaba9290873f4e to your computer and use it in GitHub Desktop.

Select an option

Save KaoruNishikawa/79cb93b307f526dae3aaba9290873f4e to your computer and use it in GitHub Desktop.
[tool.poetry]
name = "test"
version = "0.1.0"
description = ""
[tool.poetry.dependencies]
python = "^3.7"
matplotlib = "^3.3.2"
astropy = "^4.1"
numpy = "^1.19.4"
xarray = "^0.16.1"
necstdb = "^0.2.3"
n_const = "^0.1.0"
[tool.poetry.dev-dependencies]
pytest = "^5.2"
black = "^20.8b1"
flake8 = "^3.8.4"
jupyter = "^1.0.0"
jupyterlab = "^2.2.9"
sphinx = "^3.3.1"
sphinx-rtd-theme = "^0.5.0"
m2r2 = "^0.2.7"
ipykernel = "^5.3.4"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
import pickle
from pathlib import Path
from datetime import datetime
from typing import Union, Optional, Tuple
import necstdb
import numpy as np
import xarray as xr
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.gridspec import GridSpec
from astropy.coordinates import SkyCoord
from astropy.coordinates import AltAz, FK5, Galactic
import n_const.constants as n2const
from nasco_analysis.kisa_rev import apply_kisa_test
PathLike = Union[str, Path]
timestamp2datetime = np.vectorize(datetime.utcfromtimestamp)
class ScanCheck:
"""
Tool to check whether the data is taken with "normally" controlled
system.
"""
ENCODER_TOPIC = "status_encoder"
OBSMODE_TOPIC = "obsmode"
def __init__(self, data_path: PathLike, kisa_path: PathLike = None) -> None:
self.data_path = Path(data_path)
self.kisa_path = Path(kisa_path)
self.db = necstdb.opendb(self.data_path)
self.encoder_data = self.load_data(self.ENCODER_TOPIC)
self.obsmode_data = self.load_data(self.OBSMODE_TOPIC)
self.target = str(self.data_path).split("_")[-1]
def load_data(self, topic_name: str) -> dict:
data = self.db.open_table(topic_name).read(astype="array")
return {field: data[field] for field in data.dtype.names}
def create_data_array(self, dump: bool = False) -> xr.Dataset:
encoder_array = (
xr.Dataset({k: xr.DataArray(v) for k, v in self.encoder_data.items()})
.rename({"dim_0": "t", "timestamp": "t"})
.set_coords("t")
.swap_dims({"t": "t"})
)
coords = self.trans_coords(encoder_array)
obsmode_array = (
xr.Dataset({k: xr.DataArray(v) for k, v in self.obsmode_data.items()})
.rename({"dim_0": "t", "received_time": "t"})
.set_coords("t")
.swap_dims({"t": "t"})
)
drive_data = encoder_array.assign(
obsmode_array.reindex_like(encoder_array, "nearest")
).assign_coords(coords)
if dump is True:
pickle_dir = "./"
if dump:
pickle_path = Path(pickle_dir) / Path(self.data_path.stem).with_suffix(
".pickle"
)
with open(pickle_path, "wb") as f:
pickle.dump(drive_data, f)
return pickle_path.absolute()
return drive_data
def trans_coords(self, enc: xr.Dataset) -> dict:
"""
Transform azel coordinate to equatorial and galactic coordinates.
Notes
-----
This function will take 100s to process data of length 300k.
"""
print(
"Calculating coordinates, "
f"will take about {enc.sizes.get('t') / 3000:.0f} s"
)
# empirical, about 3000 data are processed per second
_az = enc.enc_az / 3600
_el = enc.enc_el / 3600
if self.kisa_path:
d_az, d_el = apply_kisa_test(azel=(_az, _el), hosei=self.kisa_path)
else:
d_az, d_el = 0, 0
az = _az + d_az / 3600
el = _el + d_el / 3600
coord_horizontal = SkyCoord(
az=az.data,
alt=el.data,
frame=AltAz,
location=n2const.LOC_NANTEN2,
obstime=timestamp2datetime(enc.t),
unit="deg",
)
# If you give the coordinate as xr.DataArray, it may take 100 times longer
# to get astropy object.
coord_equatorial = coord_horizontal.transform_to(
FK5
) # this transformation take quite long time
coord_galactic = coord_equatorial.transform_to(Galactic)
ret = {
"az": ("t", az),
"el": ("t", el),
"ra": ("t", coord_equatorial.ra),
"dec": ("t", coord_equatorial.dec),
"l": ("t", coord_galactic.l),
"b": ("t", coord_galactic.b),
}
return ret
class VisualizeScan:
MAIN_OBSMODES = [b"ON ", b"OFF ", b"SKY "]
OTHER_OBSMODES = [b" ", b"Non ", b"HOT "]
PLOT_COLOR = {
b"HOT ": "#F00",
b"Non ": "#000",
b"OFF ": "#0DF",
b"SKY ": "#0DF",
b"ON ": "#0F0",
b" ": "#777",
}
COORD_MAP = {
"horizontal": {
"xy": ["az", "el"],
"title": "Horizontal",
"label": ["Az. [deg]", "El. [deg]"],
},
"equatorial": {
"xy": ["ra", "dec"],
"title": "Equatorial (J2000)",
"label": ["R.A. [deg]", "Dec. [deg]"],
},
"galactic": {
"xy": ["l", "b"],
"title": "Galactic",
"label": ["$l$ [deg]", "$b$ [deg]"],
},
}
def __init__(
self, drive_data: xr.Dataset, observation: str = "NotSpecified"
) -> None:
self.drive_data = drive_data
self.observation = observation
self.target = observation.split("_")[-1]
@classmethod
def from_pickle(cls, pickle_path: PathLike) -> None:
observation = str(pickle_path).split(".")[0]
with open(pickle_path, "rb") as p:
return cls(pickle.load(p), observation)
@classmethod
def draw_data_centric(cls, ax: matplotlib.axes._axes.Axes) -> None:
return NotImplemented
def draw_one_coord(self, coord: str, ax: matplotlib.axes._axes.Axes = None) -> None:
"""
Parameters
----------
coord: str
Either of ["horizontal", "equatorial", "galactic"].
ax: axes object of matplotlib
Axis to which the data is drawn.
"""
if ax is None:
fig, ax = plt.subplots(1, 1)
coord = coord.lower()
existing_obsmodes = np.unique(self.drive_data.obs_mode)
# get common obsmodes
main_obsmodes = set(self.MAIN_OBSMODES).intersection(existing_obsmodes)
other_obsmodes = set(self.OTHER_OBSMODES).intersection(existing_obsmodes)
x, y = self.COORD_MAP[coord]["xy"]
def draw(zorder):
mod = mode.decode("utf8")
settings = {
"s": 0.1,
"zorder": zorder,
"label": mod,
"c": self.PLOT_COLOR[mode],
}
drive_dat = self.drive_data.where(
self.drive_data.obs_mode == mode, drop=True
)
ax.scatter(drive_dat[x], drive_dat[y], **settings)
for mode in main_obsmodes:
draw(zorder=-1)
self.xlim, self.ylim = ax.get_xlim(), ax.get_ylim()
if coord != "horizontal":
self.xlim = self.xlim[::-1]
for mode in other_obsmodes:
draw(zorder=-2)
ax.set(
title=self.COORD_MAP[coord]["title"],
xlim=self.xlim,
ylim=self.ylim,
xlabel=self.COORD_MAP[coord]["label"][0],
ylabel=self.COORD_MAP[coord]["label"][1],
)
ax.set_rasterization_zorder(0)
ax.legend()
ax.grid()
return
def draw_figure(self, save: Union[PathLike, bool] = False) -> Optional[Path]:
print("Drawing a figure...")
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
self.draw_one_coord("horizontal", axes[0])
self.draw_one_coord("equatorial", axes[1])
self.draw_one_coord("galactic", axes[2])
plt.tight_layout()
if save is True:
save_dir = "./"
if save:
fig_path = Path(save_dir) / f"{self.target}_observation.pdf"
plt.savefig(fig_path, dpi=150)
return fig_path.absolute()
def create_movie(
self,
coord: str = "galactic",
save_dir: Union[PathLike, str] = None,
bunch: int = 10,
vrange: Tuple[float] = (-2000, 2000),
plot_t_width: float = 15,
) -> Path:
"""
Create a movie of scan track.
Notes
-----
To run this function needs FFMpeg installed. If you don't have it,
you can install by `brew install ffmpeg`.
"""
print("Creating a movie...")
self.draw_one_coord(coord)
data_frequency = self.drive_data.t.diff("t").mean()
fig = plt.figure(figsize=(7, 7))
gs = GridSpec(nrows=2, ncols=1, height_ratios=[5, 3])
axes = {
field: fig.add_subplot(spec) for field, spec in zip(["scan", "speed"], gs)
}
_x, _y = self.COORD_MAP[coord]["xy"]
x = self.drive_data[_x].data
y = self.drive_data[_y].data
v_az = self.drive_data.az.differentiate("t").data * 3600 # arcsec
v_el = self.drive_data.el.differentiate("t").data * 3600 # arcsec
t = self.drive_data.timestamp.data
obsmode = self.drive_data.obs_mode.data
(az_speed,) = axes["speed"].plot([], [], "bo", ms=0.5)
(el_speed,) = axes["speed"].plot([], [], "ro", ms=0.5)
def init():
[ax.grid(True) for ax in axes.values()]
axes["scan"].set(
xlabel=self.COORD_MAP[coord]["label"][0],
ylabel=self.COORD_MAP[coord]["label"][1],
xlim=self.xlim,
ylim=self.ylim,
)
axes["speed"].set(xlabel="t", ylabel="v [arcsec/s]")
axes["speed"].text(
0.99,
0.99,
"BLUE: Az, RED: El",
transform=axes["speed"].transAxes,
ha="right",
va="top",
)
az_speed.set_data([], [])
el_speed.set_data([], [])
return (az_speed, el_speed)
def update(i):
j = int(max(0, i * bunch - 2 * plot_t_width // data_frequency))
x_ = x[i * bunch : (i + 1) * bunch]
y_ = y[i * bunch : (i + 1) * bunch]
v_az_ = v_az[j : (i + 1) * bunch].clip(*vrange) # arcsec
v_el_ = v_el[j : (i + 1) * bunch].clip(*vrange) # arcsec
t_ = t[j : (i + 1) * bunch]
obsmode_ = obsmode[i * bunch : (i + 1) * bunch]
settings = {
"s": 0.05,
"zorder": -1,
"c": [self.PLOT_COLOR[mode] for mode in obsmode_],
}
axes["scan"].scatter(x_, y_, **settings)
axes["scan"].set_rasterization_zorder(0)
az_speed.set_data(t_, v_az_)
el_speed.set_data(t_, v_el_)
axes["speed"].set(
xlim=(t_[-1] - np.timedelta64(plot_t_width, "s"), t_[-1]), ylim=vrange
)
axes["speed"].tick_params(axis="x", labelrotation=40)
return (az_speed, el_speed)
movie = animation.FuncAnimation(
fig,
update,
init_func=init,
frames=range((self.drive_data.sizes.get("t") - 1) // bunch),
# (length of diff array) // bunch
blit=True,
)
if save_dir is None:
save_dir = "./"
save_path = (Path(save_dir) / Path(self.target).stem).with_suffix(".mp4")
ffmpeg_writer = animation.FFMpegWriter(fps=6, codec="h264")
movie.save(str(save_path.absolute()), writer=ffmpeg_writer, dpi=150)
return save_path.absolute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment