|
import argparse |
|
import pandas as pd |
|
import pathlib |
|
import tempfile |
|
|
|
import numpy as np |
|
import numpy.typing as npt |
|
|
|
import scipy.signal as sps |
|
|
|
from loguru import logger |
|
|
|
|
|
|
|
def setup_args(): |
|
ap = argparse.ArgumentParser(description="jo's cool data analysis tool") |
|
|
|
ap.add_argument("input", type=pathlib.Path, help="The input file.") |
|
ap.add_argument( |
|
"--output", |
|
"-o", |
|
type=pathlib.Path, |
|
default=tempfile.mkstemp(suffix=".csv")[1], |
|
help="Output file. Not required and will generate a random temp file if " |
|
"not given that is printed to stdout.", |
|
) |
|
|
|
# This section adds the CLI options to select what process to apply to the data |
|
# I used a mutually exclusive group so that only one operation happens at a time |
|
operation = ap.add_mutually_exclusive_group(required=True) |
|
operation.add_argument( |
|
"--extract", |
|
"-e", |
|
type=int, |
|
nargs="+", |
|
help="A list of column INDEXES to extract", |
|
) |
|
operation.add_argument("--integrate", "-i", action="store_true") |
|
operation.add_argument("--zero-crossing", "-z", action="store_true") |
|
operation.add_argument("--butt", "-b", action="store_true") |
|
operation.add_argument("--peaks", "-p", action="store_true") |
|
|
|
return ap |
|
|
|
|
|
def read_input(f: pathlib.Path) -> npt.NDArray[np.float64]: |
|
df = pd.read_csv(f, skiprows=4, low_memory=False) |
|
ind = df[df.iloc[:, 0] == "Trajectories"].index |
|
df = df.iloc[: ind.values[0], :] |
|
|
|
return df.to_numpy(np.float64) |
|
|
|
|
|
def store_data(arr: npt.NDArray[np.float64], f: pathlib.Path): |
|
np.savetxt(f, arr) |
|
|
|
|
|
def integrate(arr: npt.NDArray[np.float64]): |
|
return np.cumsum(arr, axis=1) |
|
|
|
|
|
def zero_crossing(arr: npt.NDArray[np.float64]): |
|
return np.where(np.diff(np.sign(arr), axis=1))[0] |
|
|
|
|
|
def butt(arr: npt.NDArray[np.float64]): |
|
# Put butterworth logic here |
|
raise NotImplementedError() |
|
|
|
|
|
def peaks(arr: npt.NDArray[np.float64]): |
|
return sps.find_peaks(arr)[0] |
|
|
|
|
|
def extract(arr: npt.NDArray[np.float64], columns: list[int]): |
|
return arr[:,columns] |
|
|
|
|
|
if __name__ == "__main__": |
|
ap = setup_args() |
|
|
|
args = ap.parse_args() |
|
logger.info(f"Arguments have been parsed. Starting calculations") |
|
|
|
in_arr = read_input(args.input) |
|
logger.debug( |
|
f"Successfully loaded {args.input} with shape = {in_arr.shape} and " |
|
f"data type = {in_arr.dtype}" |
|
) |
|
|
|
if args.integrate: |
|
logger.info("Beginning integration") |
|
fn = integrate |
|
elif args.zero_crossing: |
|
logger.info("Beginning zero-crossing detection") |
|
fn = zero_crossing |
|
elif args.butt: |
|
logger.info("Applying Butterworth filter") |
|
fn = butt |
|
elif args.peaks: |
|
logger.info("Beginning peak detection") |
|
fn = peaks |
|
elif args.extract: |
|
logger.info(f"Beginning extraction of columns {args.extract}") |
|
fn = lambda a: extract(a, args.extract) |
|
else: |
|
raise RuntimeError() |
|
|
|
out_arr = fn(in_arr) |
|
logger.debug( |
|
f"Successfully applied {fn.__name__} on the data set. Output array has " |
|
f"shape = {out_arr.shape}" |
|
) |
|
logger.info(f"Saving data to {args.output}") |
|
store_data(out_arr, args.output) |