Last active
August 27, 2024 18:18
-
-
Save JoschD/42c3509e401e6fc312bf2d305601b5e2 to your computer and use it in GitHub Desktop.
Separate two Signals
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Signal Separation | |
----------------- | |
To install the dependencies, run: | |
``` | |
conda create -n signal_separation python=3.12 pandas matplotlib numpy | |
conda activate signal_separation | |
``` | |
or to install in an existing environment: | |
``` | |
pip install pandas matplotlib numpy | |
``` | |
""" | |
import pandas as pd | |
import numpy as np | |
from matplotlib import pyplot as plt | |
import matplotlib as mpl | |
TIME, SIGNAL, FRAME = "TIME", "SIGNAL", "FRAME" | |
# Step, margin, boundary and delta_peak are estimated by looking at the signal. | |
# The next minimum should be in the range between 50 and 110 entries/frames away. | |
STEP = 80 | |
MARGIN = 30 | |
AMPLITUDE_BOUNDARY = 5000 # to switch between the two signals | |
DELTA_PEAK = 2 # Hz around the peak | |
# Input ------------------------------------------------------------------------------------------- | |
def convert_to_s(time: str) -> float: | |
"""Convert the time string to seconds, but as floating point number.""" | |
h, m, s = time.split(":") | |
return int(h) * 60*60 + int(m) * 60 + float(s) | |
def read_file(path: str, nrows: int = None) -> pd.DataFrame: | |
"""Read the csv file and convert the time to seconds.""" | |
df = pd.read_csv(path, nrows=nrows, usecols=[0, 1, 2], header=0, engine="c") | |
df.columns = [TIME, SIGNAL, FRAME] # rename for simpler access | |
df[TIME] = df[TIME].apply(convert_to_s) # convert the time strings | |
return df | |
# Analysis Functions ------------------------------------------------------------------------------- | |
def find_minima(df: pd.DataFrame, method: str = "switch") -> tuple: | |
# Create a mask, where True means it belongs to one signal, False means it belongs to the other | |
signal_mask = np.zeros(len(df), dtype=bool) | |
# collect the minima in this array. | |
# np.argmin returns the index of the minimum value in the given array | |
# Find the first minimum in the first step | |
min_indcs = [np.argmin(df[SIGNAL][:STEP])] | |
signal_mask = assign_signal(slice(0, min_indcs[-1]), df, signal_mask, method) | |
# Now find the minima in the rest of the signal | |
while((min_indcs[-1] + STEP + MARGIN) < len(df)): | |
print(f"Current Index {min_indcs[-1]}") | |
check_index = min_indcs[-1] + STEP # check +- margin around this index | |
min_index_in_checked = np.argmin(df[SIGNAL][check_index-MARGIN:check_index+MARGIN]) # index of the new minimum in the checked area | |
min_index = check_index - MARGIN + min_index_in_checked # index of the new minimum in the DataFrame | |
signal_mask = assign_signal(slice(min_indcs[-1], min_index), df, signal_mask, method) | |
# update for next iteration --- | |
min_indcs.append(min_index) # start at the new minimum | |
# assign signal from the last minimum to the end | |
signal_mask = assign_signal(slice(min_indcs[-1], len(df)), df, signal_mask, method) | |
return signal_mask, min_indcs | |
def do_fft(df: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]: | |
"""Do a fourier transform of the signal.""" | |
n = len(df[SIGNAL]) | |
delta_t = 1 / get_frequency_range(df) | |
yf = np.fft.rfft(df[SIGNAL]) # only approximately correct, as your samples are not equally spaced | |
xf = np.fft.rfftfreq(n, d=delta_t) | |
# normalize | |
yf = yf * 2 / n | |
return xf, yf | |
def get_frequency_range(df: pd.DataFrame) -> float: | |
"""Get the frequency range F of the signal, in Hz. | |
F = 1 / delta_t with delta_t ~ (t_end - t_start) / N_samples | |
""" | |
return len(df) / (df[TIME].iloc[-1] - df[TIME].iloc[0]) | |
def assign_signal(idx_slice: slice, df: pd.DataFrame, signal_mask: np.ndarray, method: str) -> np.ndarray: | |
"""Assign the signal identifier (True or False) to the signal mask.""" | |
if method == "switch": | |
# method 1: switch signal assignment after each minimum --- | |
signal_mask[idx_slice] = not signal_mask[idx_slice.start-1] # opposite of what was before | |
else: | |
# method 2: assign to the signal depending on the maximum amplitude --- | |
signal_mask[idx_slice] = df.loc[idx_slice.start:idx_slice.stop, SIGNAL].max() < AMPLITUDE_BOUNDARY | |
return signal_mask | |
def reconstruct_signal_from_fft(xf: np.ndarray, yf: np.ndarray, df: pd.DataFrame) -> pd.DataFrame: | |
"""Reconstruct the signal from the highest peaks in the fft. | |
Todo: We could filter some noise before doing this. | |
""" | |
# number of lines around the highest peak | |
delta_f = get_frequency_range(df) / len(df) | |
n_lines = int(DELTA_PEAK / delta_f) | |
signals = [None] * 3 | |
yf_abs = np.abs(yf) | |
# remove the frequencies around the offset | |
signals[0] = yf[0] | |
yf_abs[:n_lines] = 0 | |
for i_signal in range(1, len(signals)): | |
peak_idx = np.argmax(yf_abs) | |
signals[i_signal] = yf[peak_idx] * np.exp(1j * 2 * np.pi * xf[peak_idx] * df[TIME].values) | |
# remove the frequencies around the highest peak | |
yf_abs[peak_idx-n_lines:peak_idx+n_lines] = 0 | |
return signals | |
# Plotting functions ----------------------------------------------------------- | |
def plot_original(df: pd.DataFrame) -> mpl.figure.Figure: | |
"""Plot the original signal.""" | |
fig, ax = plt.subplots(1, 1) | |
ax.plot(df[TIME], df[SIGNAL]) | |
ax.set_xlabel("Time [s]") | |
ax.set_ylabel("Signal") | |
return fig | |
def plot_fft(df: pd.DataFrame, xf: np.ndarray, yf: np.ndarray) -> mpl.figure.Figure: | |
"""Plot the fft of the signal.""" | |
fig, ax = plt.subplots(1, 1) | |
ax.plot(xf, np.abs(yf)) | |
ax.set_xlabel("Frequency [Hz]") | |
ax.set_ylabel("Amplitude") | |
ax.set_xlim(0, None) | |
return fig | |
def plot_original_and_fft_approximation(df: pd.DataFrame, xf: np.ndarray, yf: np.ndarray) -> mpl.figure.Figure: | |
"""Plot the original signal and a reconstructed signal from the two highest peaks in the fft.""" | |
reconstructed = reconstruct_signal_from_fft(xf, yf, df) | |
fig, ax = plt.subplots(1, 1) | |
# Original --- | |
ax.plot(df[TIME], df[SIGNAL], label="Signal") | |
# Reconstructions --- | |
for idx,rec_signal in enumerate(reconstructed[1:]): | |
ax.plot(df[TIME], np.real(rec_signal+reconstructed[0]), label=f"Reconstructed signal part {idx+1}", ls="--") | |
ax.plot(df[TIME], np.real(sum(reconstructed)), label="Reconstructed signal", ls="--") | |
# Axes --- | |
ax.set_xlabel("Time [s]") | |
ax.set_ylabel("Signal") | |
ax.legend() | |
return fig | |
def plot_both_signals(df: pd.DataFrame, signal_mask: np.ndarray, min_indcs: list) -> mpl.figure.Figure: | |
"""Plot the two separated signals and the found minima. | |
Hints: | |
- df.loc[mask, column] selects the rows where mask is True. | |
- The ~ operator inverts the mask (~[True, False, False] = [False, True, True]), | |
i.e. df.loc[~mask, column] selects the rows where mask is False | |
""" | |
fig, ax = plt.subplots(1,1) | |
ax.plot(df.loc[signal_mask, TIME], df.loc[signal_mask,SIGNAL], label="Signal 1", ls="-") | |
ax.plot(df.loc[~signal_mask, TIME], df.loc[~signal_mask,SIGNAL], label="Signal 2", ls="-") | |
ax.plot( | |
df.loc[min_indcs, TIME], df.loc[min_indcs,SIGNAL], label="_min", ls="none", | |
marker="o", markerfacecolor="none", color="red" | |
) | |
ax.set_xlabel("Time [s]") | |
ax.set_ylabel("Signal") | |
return fig | |
# Run the program -------------------------------------------------------------- | |
def main(): | |
"""Main function to run. """ | |
df = read_file(path="21082024_calibration.csv", nrows=10000) # nrows=None -> read whole file | |
# ---- Only do with a subset of the data ---- | |
plot_original(df) | |
xf, yf = do_fft(df) | |
plot_fft(df, xf, yf) | |
plot_original_and_fft_approximation(df, xf, yf) | |
# ------------------------------------------------- | |
# signal_mask, min_indcs = find_minima(df, method="boundary") | |
signal_mask, min_indcs = find_minima(df, method="switch") | |
plot_both_signals(df, signal_mask, min_indcs) | |
plt.show() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment