Skip to content

Instantly share code, notes, and snippets.

@JoschD
Last active August 27, 2024 18:18
Show Gist options
  • Save JoschD/42c3509e401e6fc312bf2d305601b5e2 to your computer and use it in GitHub Desktop.
Save JoschD/42c3509e401e6fc312bf2d305601b5e2 to your computer and use it in GitHub Desktop.
Separate two Signals
"""
Signal Separation
-----------------
To install the dependencies, run:
```
conda create -n signal_separation python=3.12 pandas matplotlib numpy
conda activate signal_separation
```
or to install in an existing environment:
```
pip install pandas matplotlib numpy
```
"""
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import matplotlib as mpl
TIME, SIGNAL, FRAME = "TIME", "SIGNAL", "FRAME"
# Step, margin, boundary and delta_peak are estimated by looking at the signal.
# The next minimum should be in the range between 50 and 110 entries/frames away.
STEP = 80
MARGIN = 30
AMPLITUDE_BOUNDARY = 5000 # to switch between the two signals
DELTA_PEAK = 2 # Hz around the peak
# Input -------------------------------------------------------------------------------------------
def convert_to_s(time: str) -> float:
"""Convert the time string to seconds, but as floating point number."""
h, m, s = time.split(":")
return int(h) * 60*60 + int(m) * 60 + float(s)
def read_file(path: str, nrows: int = None) -> pd.DataFrame:
"""Read the csv file and convert the time to seconds."""
df = pd.read_csv(path, nrows=nrows, usecols=[0, 1, 2], header=0, engine="c")
df.columns = [TIME, SIGNAL, FRAME] # rename for simpler access
df[TIME] = df[TIME].apply(convert_to_s) # convert the time strings
return df
# Analysis Functions -------------------------------------------------------------------------------
def find_minima(df: pd.DataFrame, method: str = "switch") -> tuple:
# Create a mask, where True means it belongs to one signal, False means it belongs to the other
signal_mask = np.zeros(len(df), dtype=bool)
# collect the minima in this array.
# np.argmin returns the index of the minimum value in the given array
# Find the first minimum in the first step
min_indcs = [np.argmin(df[SIGNAL][:STEP])]
signal_mask = assign_signal(slice(0, min_indcs[-1]), df, signal_mask, method)
# Now find the minima in the rest of the signal
while((min_indcs[-1] + STEP + MARGIN) < len(df)):
print(f"Current Index {min_indcs[-1]}")
check_index = min_indcs[-1] + STEP # check +- margin around this index
min_index_in_checked = np.argmin(df[SIGNAL][check_index-MARGIN:check_index+MARGIN]) # index of the new minimum in the checked area
min_index = check_index - MARGIN + min_index_in_checked # index of the new minimum in the DataFrame
signal_mask = assign_signal(slice(min_indcs[-1], min_index), df, signal_mask, method)
# update for next iteration ---
min_indcs.append(min_index) # start at the new minimum
# assign signal from the last minimum to the end
signal_mask = assign_signal(slice(min_indcs[-1], len(df)), df, signal_mask, method)
return signal_mask, min_indcs
def do_fft(df: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
"""Do a fourier transform of the signal."""
n = len(df[SIGNAL])
delta_t = 1 / get_frequency_range(df)
yf = np.fft.rfft(df[SIGNAL]) # only approximately correct, as your samples are not equally spaced
xf = np.fft.rfftfreq(n, d=delta_t)
# normalize
yf = yf * 2 / n
return xf, yf
def get_frequency_range(df: pd.DataFrame) -> float:
"""Get the frequency range F of the signal, in Hz.
F = 1 / delta_t with delta_t ~ (t_end - t_start) / N_samples
"""
return len(df) / (df[TIME].iloc[-1] - df[TIME].iloc[0])
def assign_signal(idx_slice: slice, df: pd.DataFrame, signal_mask: np.ndarray, method: str) -> np.ndarray:
"""Assign the signal identifier (True or False) to the signal mask."""
if method == "switch":
# method 1: switch signal assignment after each minimum ---
signal_mask[idx_slice] = not signal_mask[idx_slice.start-1] # opposite of what was before
else:
# method 2: assign to the signal depending on the maximum amplitude ---
signal_mask[idx_slice] = df.loc[idx_slice.start:idx_slice.stop, SIGNAL].max() < AMPLITUDE_BOUNDARY
return signal_mask
def reconstruct_signal_from_fft(xf: np.ndarray, yf: np.ndarray, df: pd.DataFrame) -> pd.DataFrame:
"""Reconstruct the signal from the highest peaks in the fft.
Todo: We could filter some noise before doing this.
"""
# number of lines around the highest peak
delta_f = get_frequency_range(df) / len(df)
n_lines = int(DELTA_PEAK / delta_f)
signals = [None] * 3
yf_abs = np.abs(yf)
# remove the frequencies around the offset
signals[0] = yf[0]
yf_abs[:n_lines] = 0
for i_signal in range(1, len(signals)):
peak_idx = np.argmax(yf_abs)
signals[i_signal] = yf[peak_idx] * np.exp(1j * 2 * np.pi * xf[peak_idx] * df[TIME].values)
# remove the frequencies around the highest peak
yf_abs[peak_idx-n_lines:peak_idx+n_lines] = 0
return signals
# Plotting functions -----------------------------------------------------------
def plot_original(df: pd.DataFrame) -> mpl.figure.Figure:
"""Plot the original signal."""
fig, ax = plt.subplots(1, 1)
ax.plot(df[TIME], df[SIGNAL])
ax.set_xlabel("Time [s]")
ax.set_ylabel("Signal")
return fig
def plot_fft(df: pd.DataFrame, xf: np.ndarray, yf: np.ndarray) -> mpl.figure.Figure:
"""Plot the fft of the signal."""
fig, ax = plt.subplots(1, 1)
ax.plot(xf, np.abs(yf))
ax.set_xlabel("Frequency [Hz]")
ax.set_ylabel("Amplitude")
ax.set_xlim(0, None)
return fig
def plot_original_and_fft_approximation(df: pd.DataFrame, xf: np.ndarray, yf: np.ndarray) -> mpl.figure.Figure:
"""Plot the original signal and a reconstructed signal from the two highest peaks in the fft."""
reconstructed = reconstruct_signal_from_fft(xf, yf, df)
fig, ax = plt.subplots(1, 1)
# Original ---
ax.plot(df[TIME], df[SIGNAL], label="Signal")
# Reconstructions ---
for idx,rec_signal in enumerate(reconstructed[1:]):
ax.plot(df[TIME], np.real(rec_signal+reconstructed[0]), label=f"Reconstructed signal part {idx+1}", ls="--")
ax.plot(df[TIME], np.real(sum(reconstructed)), label="Reconstructed signal", ls="--")
# Axes ---
ax.set_xlabel("Time [s]")
ax.set_ylabel("Signal")
ax.legend()
return fig
def plot_both_signals(df: pd.DataFrame, signal_mask: np.ndarray, min_indcs: list) -> mpl.figure.Figure:
"""Plot the two separated signals and the found minima.
Hints:
- df.loc[mask, column] selects the rows where mask is True.
- The ~ operator inverts the mask (~[True, False, False] = [False, True, True]),
i.e. df.loc[~mask, column] selects the rows where mask is False
"""
fig, ax = plt.subplots(1,1)
ax.plot(df.loc[signal_mask, TIME], df.loc[signal_mask,SIGNAL], label="Signal 1", ls="-")
ax.plot(df.loc[~signal_mask, TIME], df.loc[~signal_mask,SIGNAL], label="Signal 2", ls="-")
ax.plot(
df.loc[min_indcs, TIME], df.loc[min_indcs,SIGNAL], label="_min", ls="none",
marker="o", markerfacecolor="none", color="red"
)
ax.set_xlabel("Time [s]")
ax.set_ylabel("Signal")
return fig
# Run the program --------------------------------------------------------------
def main():
"""Main function to run. """
df = read_file(path="21082024_calibration.csv", nrows=10000) # nrows=None -> read whole file
# ---- Only do with a subset of the data ----
plot_original(df)
xf, yf = do_fft(df)
plot_fft(df, xf, yf)
plot_original_and_fft_approximation(df, xf, yf)
# -------------------------------------------------
# signal_mask, min_indcs = find_minima(df, method="boundary")
signal_mask, min_indcs = find_minima(df, method="switch")
plot_both_signals(df, signal_mask, min_indcs)
plt.show()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment