Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Last active August 19, 2025 16:02
Show Gist options
  • Save tam17aki/bc5647c04addcf7788847ee5aafd5020 to your computer and use it in GitHub Desktop.
Save tam17aki/bc5647c04addcf7788847ee5aafd5020 to your computer and use it in GitHub Desktop.
A demonstration of frequency estimation using the ESPRIT algorithm
# -*- coding: utf-8 -*-
"""A demonstration of frequency estimation using the ESPRIT algorithm.
Copyright (C) 2025 by Akira TAMAMORI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import argparse
import warnings
import numpy as np
import numpy.typing as npt
from scipy.linalg import eigh, eigvals, hankel, pinv
def synthesize_sinusoids(
fs: float,
duration: float,
freqs: npt.NDArray[np.float64],
amp_range: tuple[float, float] = (0.2, 1.2),
rng: np.random.Generator | None = None,
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""Generate a clean signal from multiple sinusoids with random amps and phases.
Args:
fs (float): Sampling frequency in Hz.
duration (float): Signal duration in seconds.
freqs (np.ndarray): Array of sinusoidal frequencies in Hz.
amp_range (tuple[float, float]): Min and max amplitude range.
rng (np.random.Generator, optional): Random generator for reproducibility.
Returns:
tuple[np.ndarray, np.ndarray, np.ndarray]:
- clean_signal: Sum of sinusoids (float64).
- amps: Amplitudes of each sinusoid.
- phases: Phases of each sinusoid in radian.
"""
if rng is None:
rng = np.random.default_rng()
t = np.linspace(0, duration, int(fs * duration), endpoint=False)
amps = rng.uniform(amp_range[0], amp_range[1], freqs.size).astype(np.float64)
phases = rng.uniform(-np.pi, np.pi, freqs.size).astype(np.float64)
components = [
a * np.cos(2 * np.pi * f * t + p) for f, a, p in zip(freqs, amps, phases)
]
clean_signal = np.asarray(np.sum(components, axis=0, dtype=np.float64))
return clean_signal, amps, phases
def add_awgn(
signal: npt.NDArray[np.float64],
snr_db: float,
rng: np.random.Generator | None = None,
) -> npt.NDArray[np.float64]:
"""Add Additive White Gaussian Noise (AWGN) to a given signal.
Args:
signal (np.ndarray): Input clean signal.
snr_db (float): Target signal-to-noise ratio in dB.
rng (np.random.Generator, optional): Random generator.
Returns:
np.ndarray: Noisy signal with specified SNR.
"""
if rng is None:
rng = np.random.default_rng()
signal_power = np.var(signal)
noise_power = signal_power / (10 ** (snr_db / 10))
noise = rng.normal(0.0, np.sqrt(noise_power), signal.size)
return signal + noise
def generate_test_signal(
fs: float,
duration: float,
f_true: npt.NDArray[np.float64],
snr_db: float,
amp_range: tuple[float, float] = (0.2, 1.2),
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""Generate a noisy test signal consisting of multiple sinusoids and AWGN.
Args:
fs (float): Sampling frequency in Hz.
duration (float): Signal duration in seconds.
f_true (np.ndarray): Array of sinusoidal frequencies in Hz.
snr_db (float): Target signal-to-noise ratio in dB.
amp_range (tuple[float, float]): Min and max amplitude range.
Returns:
tuple[np.ndarray, np.ndarray, np.ndarray]:
- noisy_signal (np.ndarray of float64): Generated test signal.
- amps (np.ndarray of float64): Random amplitudes assigned to each sinusoid.
- phases (np.ndarray of float64): Random phases assigned to each sinusoid.
"""
clean_signal, amps, phases = synthesize_sinusoids(fs, duration, f_true, amp_range)
noisy_signal = add_awgn(clean_signal, snr_db)
return noisy_signal, amps, phases
def _build_covariance_matrix(
frame: npt.NDArray[np.complex128], subspace_dim: int
) -> npt.NDArray[np.complex128]:
"""Build the covariance matrix from the input frame."""
n_samples = frame.size
n_snapshots = n_samples - subspace_dim + 1
hankel_matrix = hankel(frame[:subspace_dim], frame[subspace_dim - 1 :])
_cov_matrix = (hankel_matrix @ hankel_matrix.conj().T) / n_snapshots
cov_matrix: npt.NDArray[np.complex128] = _cov_matrix.astype(np.complex128)
return cov_matrix
def _estimate_signal_subspace(
cov_matrix: npt.NDArray[np.complex128], model_order: int
) -> npt.NDArray[np.complex128] | None:
"""Estimate the signal subspace using eigenvalue decomposition."""
try:
_, eigenvectors = eigh(cov_matrix)
except np.linalg.LinAlgError:
warnings.warn("Eigenvalue decomposition on covariance matrix failed.")
return None
signal_subspace: npt.NDArray[np.complex128] = eigenvectors[:, -model_order:]
return signal_subspace
def _solve_params_from_subspace(
signal_subspace: npt.NDArray[np.complex128], fs: float
) -> npt.NDArray[np.float64]:
"""Solve for frequencies from the signal subspace."""
subspace_upper = signal_subspace[:-1, :]
subspace_lower = signal_subspace[1:, :]
try:
rotation_operator_psi = pinv(subspace_upper) @ subspace_lower
except np.linalg.LinAlgError:
warnings.warn("TLS matrix inversion failed in parameter solving.")
return np.array([])
try:
eigenvalues_psi = eigvals(rotation_operator_psi)
except np.linalg.LinAlgError:
warnings.warn(
"Eigenvalue decomposition failed while solving rotation operator."
)
return np.array([])
angles = np.angle(eigenvalues_psi)
estimated_freqs_hz = angles * (fs / (2 * np.pi))
return np.sort([f for f in estimated_freqs_hz if f > 0])
def estimate_frequencies_esprit(
frame: npt.NDArray[np.complex128],
fs: float,
n_real_sinusoids: int,
separation_factor: float = 0.4,
) -> npt.NDArray[np.float64]:
"""Estimate frequencies of multiple non-damped sinusoids using ESPRIT.
Args:
frame (np.ndarray):
One-dimensional input signal frame (time-domain samples), dtype=complex128.
fs (float):
Sampling frequency in Hz.
n_real_sinusoids (int):
Number of sinusoidal components to estimate.
separation_factor (float, optional):
A factor to determine the minimum separation between estimated frequencies,
relative to the FFT resolution limit (fs / n_samples). A value of 0.5
corresponds to half the FFT resolution. Lowering this value (e.g., to 0.4)
can help resolve very closely spaced sinusoids that ESPRIT is capable of
separating. Defaults to 0.4.
Returns:
np.ndarray:
Array of estimated frequencies in Hz (dtype=float64).
Returns an empty array if estimation fails.
"""
model_order = 2 * n_real_sinusoids
n_samples = frame.size
# subspace_dim is also called the pencil parameter M
subspace_dim = n_samples // 3
if subspace_dim <= model_order or subspace_dim >= n_samples - model_order:
warnings.warn("Invalid subspace dimension for ESPRIT. Returning empty result.")
return np.array([])
# 1. Build the covariance matrix
cov_matrix = _build_covariance_matrix(frame, subspace_dim)
# 2. Estimate the signal subspace
signal_subspace = _estimate_signal_subspace(cov_matrix, model_order)
if signal_subspace is None:
warnings.warn("Failed to estimate signal subspace. Returning empty result.")
return np.array([])
# 3. Find frequencies from subspace
estimated_freqs = _solve_params_from_subspace(signal_subspace, fs)
if estimated_freqs.size == 0:
warnings.warn("No valid frequencies estimated. Returning empty result.")
return np.array([])
unique_freqs = [estimated_freqs[0]]
min_separation_hz = (fs / n_samples) * separation_factor
for f in estimated_freqs[1:]:
if np.abs(f - unique_freqs[-1]) > min_separation_hz:
unique_freqs.append(f)
return np.array(unique_freqs[:n_real_sinusoids])
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments for ESPRIT demo."""
parser = argparse.ArgumentParser(
description="Frequency estimation demo using ESPRIT algorithm."
)
parser.add_argument(
"--fs",
type=float,
default=44100.0,
help="Sampling frequency in Hz (default: 44100)",
)
parser.add_argument(
"--duration",
type=float,
default=0.04,
help="Signal duration in seconds (default: 0.04)",
)
parser.add_argument(
"--snr_db",
type=float,
default=10.0,
help="Signal-to-noise ratio in dB (default: 10)",
)
parser.add_argument(
"--f_true",
type=float,
nargs="+",
default=[440.0, 460.0, 480.0],
help="List of true frequencies in Hz (space separated). Default: 440 460 480",
)
parser.add_argument(
"--amp_range",
type=float,
nargs=2,
default=[0.2, 1.2],
metavar=("AMP_MIN", "AMP_MAX"),
help="Amplitude range for sinusoid generation (default: 0.2 1.2)",
)
parser.add_argument(
"--sep_factor",
type=float,
default=0.4,
help="Separation factor for resolving close frequencies, "
+ "relative to FFT resolution (fs / n_samples). "
+ "A value < 0.5 can help separate frequencies closer than the FFT limit. "
+ "(default: 0.4)",
)
return parser.parse_args()
def main() -> None:
"""Perform demonstration."""
args = parse_args()
fs = args.fs
duration = args.duration
snr_db = args.snr_db
f_true = np.array(args.f_true, dtype=np.float64)
amp_range = tuple(args.amp_range)
separation_factor = args.sep_factor
n_sinusoids = f_true.size
noisy_signal, amps, phases = generate_test_signal(
fs, duration, f_true, snr_db, amp_range
)
print("--- Experiment Setup ---")
print(f"Sampling Frequency: {fs} Hz")
print(f"Signal Duration: {duration*1000:.0f} ms")
print(f"True Frequencies: {f_true} Hz")
print(f"Amplitudes: {amps}")
print(f"Phases: {phases} rad")
print(f"Separation Factor: {separation_factor}")
print(f"SNR: {snr_db} dB\n")
print("--- Running ESPRIT ---")
esprit_freqs = estimate_frequencies_esprit(
noisy_signal.astype(np.complex128), fs, n_sinusoids, separation_factor
)
print(f"Estimated (ESPRIT): {esprit_freqs}")
if esprit_freqs.size == n_sinusoids:
print(f"Errors (ESPRIT): {esprit_freqs - f_true}\n")
else:
print(
f"Warning: Only found {esprit_freqs.size} peaks, "
+ f"but expected {n_sinusoids}."
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment