Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Last active August 21, 2025 08:39
Show Gist options
  • Save tam17aki/70ecb7a641b91890fa79d4660c32c7ce to your computer and use it in GitHub Desktop.
Save tam17aki/70ecb7a641b91890fa79d4660c32c7ce to your computer and use it in GitHub Desktop.
A demonstration of parameter estimation for sinusoidal signals via the MUSIC algorithm and the least squares method.
# -*- coding: utf-8 -*-
"""A demonstration of parameter estimation for sinusoidal signals.
Frequencies are estimated using the MUSIC algorithm, followed by
amplitude and phase estimation via the least squares method.
Copyright (C) 2025 by Akira TAMAMORI
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import argparse
import warnings
from dataclasses import dataclass
import numpy as np
import numpy.typing as npt
from scipy.linalg import eigh, hankel, pinv
from scipy.signal import find_peaks
@dataclass(frozen=True)
class SinusoidParameters:
"""Represent the parameters of multiple sinusoids."""
frequencies: npt.NDArray[np.float64]
amplitudes: npt.NDArray[np.float64]
phases: npt.NDArray[np.float64]
@dataclass(frozen=True)
class ExperimentConfig:
"""Store the configuration for a signal processing experiment."""
fs: float
duration: float
snr_db: float
freqs_true: npt.NDArray[np.float64]
amp_range: tuple[float, float]
n_grids: int
@property
def n_sinusoids(self) -> int:
"""Return the number of sinusoids."""
return self.freqs_true.size
def generate_amps_phases(
amp_range: tuple[float, float],
n_sinusoids: int,
rng: np.random.Generator | None = None,
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""Generate amplitudes and phases for multiple sinusoids.
Args:
amp_range (tuple of float64): Lower and upper bound for amplitude.
n_sinusoids (int): Number of sinusoids.
rng (np.random.Generator, optional): Random generator.
Returns:
tuple[np.ndarray, np.ndarray]:
- amps (np.ndarray of float64): Random amplitudes assigned to each sinusoid.
- phases (np.ndarray of float64): Random phases assigned to each sinus
"""
if rng is None:
rng = np.random.default_rng()
amps = rng.uniform(amp_range[0], amp_range[1], n_sinusoids).astype(np.float64)
phases = rng.uniform(-np.pi, np.pi, n_sinusoids).astype(np.float64)
return amps, phases
def synthesize_sinusoids(
fs: float, duration: float, params: SinusoidParameters
) -> npt.NDArray[np.float64]:
"""Generate a clean signal from multiple sinusoids.
Args:
fs (float): Sampling frequency in Hz.
duration (float): Signal duration in seconds.
params (SinusoidParameters): Parametes of mutiple sinusoids.
Returns:
clean_signal (np.ndarray): Sum of multiple sinusoids (float64).
"""
t = np.linspace(0, duration, int(fs * duration), endpoint=False)
clean_signal = np.zeros(t.size, dtype=np.float64)
for f, a, p in zip(params.frequencies, params.amplitudes, params.phases):
clean_signal += a * np.cos(2 * np.pi * f * t + p)
return clean_signal
def add_awgn(
signal: npt.NDArray[np.float64],
snr_db: float,
rng: np.random.Generator | None = None,
) -> npt.NDArray[np.float64]:
"""Add Additive White Gaussian Noise (AWGN) to a given signal.
Args:
signal (np.ndarray): Input clean signal.
snr_db (float): Target signal-to-noise ratio in dB.
rng (np.random.Generator, optional): Random generator.
Returns:
np.ndarray: Noisy signal with specified SNR.
"""
if rng is None:
rng = np.random.default_rng()
signal_power = np.var(signal)
noise_power = signal_power / (10 ** (snr_db / 10))
noise = rng.normal(0.0, np.sqrt(noise_power), signal.size)
return signal + noise
def generate_test_signal(
fs: float, duration: float, snr_db: float, params: SinusoidParameters
) -> npt.NDArray[np.float64]:
"""Generate a noisy test signal consisting of multiple sinusoids and AWGN.
Args:
fs (float): Sampling frequency in Hz.
duration (float): Signal duration in seconds.
snr_db (float): Target signal-to-noise ratio in dB.
params (SinusoidParameters): Parametes of mutiple sinusoids.
Returns:
noisy_signal (np.ndarray of float64): Generated test signal.
"""
clean_signal = synthesize_sinusoids(fs, duration, params)
noisy_signal = add_awgn(clean_signal, snr_db)
return noisy_signal
def _build_covariance_matrix(
frame: npt.NDArray[np.complex128], subspace_dim: int
) -> npt.NDArray[np.complex128]:
"""Build the covariance matrix from the input frame."""
n_samples = frame.size
n_snapshots = n_samples - subspace_dim + 1
hankel_matrix = hankel(frame[:subspace_dim], frame[subspace_dim - 1 :])
_cov_matrix = (hankel_matrix @ hankel_matrix.conj().T) / n_snapshots
cov_matrix: npt.NDArray[np.complex128] = _cov_matrix.astype(np.complex128)
return cov_matrix
def _estimate_noise_subspace(
frame: npt.NDArray[np.complex128], subspace_dim: int, model_order: int
) -> npt.NDArray[np.complex128] | None:
"""Build the covariance matrix and estimates the noise subspace."""
# 1. Build the covariance matrix (using the FB method)
cov_matrix = _build_covariance_matrix(frame, subspace_dim)
# 2. Eigenvalue decomposition
try:
_, eigenvectors = eigh(cov_matrix)
except np.linalg.LinAlgError:
warnings.warn("Eigenvalue decomposition on covariance matrix failed.")
return None
# The noise subspace is the set of vectors corresponding to the smaller eigenvalues
# Since it is in ascending order, select (subspace_dim - model_order) vectors
# from the beginning
n_noise_vectors = subspace_dim - model_order
_subspace = eigenvectors[:, :n_noise_vectors]
noise_subspace: npt.NDArray[np.complex128] = _subspace.astype(np.complex128)
return noise_subspace
def _calculate_music_spectrum(
fs: float,
subspace_dim: int,
noise_subspace: npt.NDArray[np.complex128],
n_grid_points: int,
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""Calculate the MUSIC pseudospectrum over a frequency grid."""
freq_grid: npt.NDArray[np.float64] = np.linspace(
0, fs / 2, num=n_grid_points, dtype=np.float64
)
music_spectrum = np.zeros(freq_grid.size)
# G*G^H only needs to be calculated once
projector_onto_noise = noise_subspace @ noise_subspace.conj().T
for i, f in enumerate(freq_grid):
omega = 2 * np.pi * f / fs
# Calculate a steering vector a(ω)
steering_vector = np.exp(-1j * omega * np.arange(subspace_dim))
# Calculate the denominator a^H * (G*G^H) * a
steering_vector_h = steering_vector.conj()
denominator = steering_vector_h @ projector_onto_noise @ steering_vector
# Add a small value to avoid division by zero
music_spectrum[i] = 1 / (np.abs(denominator) + 1e-12)
return freq_grid, music_spectrum
def _find_music_peaks(
freq_grid: npt.NDArray[np.float64],
music_spectrum: npt.NDArray[np.float64],
n_real_sinusoids: int,
) -> npt.NDArray[np.float64]:
"""Find the N strongest peaks from the MUSIC spectrum."""
# 1. Find all "local maxima" as peak candidates without using prominence.
# Ignores extremely small noise floor fluctuations.
all_peaks, _ = find_peaks(music_spectrum, height=np.mean(music_spectrum))
all_peaks = np.array(all_peaks, dtype=np.int64)
if all_peaks.size < n_real_sinusoids:
return freq_grid[all_peaks] if all_peaks.size > 0 else np.array([])
# 2. From all the peak candidates found, select N peaks
# with the highest spectral values.
strongest_peak_indices = all_peaks[
np.argsort(music_spectrum[all_peaks])[-n_real_sinusoids:]
]
estimated_freqs = freq_grid[strongest_peak_indices]
return np.sort(estimated_freqs)
def estimate_frequencies_music(
frame: npt.NDArray[np.complex128],
fs: float,
n_real_sinusoids: int,
n_grid_points: int = 2048,
) -> npt.NDArray[np.float64]:
"""Estimate frequencies of multiple non-damped sinusoids using MUSIC.
Args:
frame (np.ndarray):
One-dimensional input signal frame (time-domain samples), dtype=complex128.
fs (float):
Sampling frequency in Hz.
n_real_sinusoids (int):
Number of sinusoidal components to estimate.
n_grid_points (int):
Number of frequency grid points for MUSIC spectrum.
Returns:
np.ndarray:
Array of estimated frequencies in Hz (dtype=float64).
Returns an empty array if estimation fails.
"""
model_order = 2 * n_real_sinusoids
n_samples = frame.size
subspace_dim = n_samples // 3
if subspace_dim <= model_order:
warnings.warn("Invalid subspace dimension for MUSIC. Returning empty result.")
return np.array([])
# 1. Estimate the noise subspace
noise_subspace = _estimate_noise_subspace(frame, subspace_dim, model_order)
if noise_subspace is None:
warnings.warn("Failed to estimate noise subspace. Returning empty result.")
return np.array([])
# 2. Calculate the MUSIC spectrum
freq_grid, music_spectrum = _calculate_music_spectrum(
fs, subspace_dim, noise_subspace, n_grid_points
)
# 3. Detecting peaks from a spectrum
estimated_freqs = _find_music_peaks(freq_grid, music_spectrum, n_real_sinusoids)
return estimated_freqs
def estimate_amplitudes_phases(
signal: npt.NDArray[np.complex128],
fs: float,
estimated_freqs: npt.NDArray[np.float64],
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""Estimate amplitudes and phases from frequencies using least squares.
Args:
signal (np.ndarray): Input signal frame (complex128).
fs (float): Sampling frequency in Hz.
estimated_freqs (np.ndarray): Array of estimated frequencies in Hz.
Returns:
tuple[np.ndarray, np.ndarray]:
- estimated_amps (np.ndarray): Estimated amplitudes.
- estimated_phases (np.ndarray): Estimated phases in radians.
"""
n_samples = signal.size
n_sinusoids = estimated_freqs.size
# 1. Build the Vandermonde matrix V
t = np.arange(n_samples) / fs
vandermonde_matrix = np.zeros((n_samples, n_sinusoids), dtype=np.complex128)
for i, freq in enumerate(estimated_freqs):
vandermonde_matrix[:, i] = np.exp(2j * np.pi * freq * t)
# 2. Solve for complex amplitudes c using pseudo-inverse
# y = V @ c => c = pinv(V) @ y
try:
complex_amps = pinv(vandermonde_matrix) @ signal
except np.linalg.LinAlgError:
warnings.warn("Least squares estimation for amplitudes/phases failed.")
return np.array([]), np.array([])
# 3. Extract amplitudes and phases
# For a real-valued sinusoid A*cos(2*pi*f*t + phi), the complex amplitude
# estimated using only the positive frequency is (A/2)*exp(j*phi).
# Therefore, we need to multiply the magnitude by 2.
estimated_amps = 2 * np.abs(complex_amps)
estimated_phases = np.angle(complex_amps).astype(np.float64)
# Sort results according to frequency for consistent comparison
sort_indices = np.argsort(estimated_freqs)
return estimated_amps[sort_indices], estimated_phases[sort_indices]
def print_experiment_setup(
config: ExperimentConfig, true_params: SinusoidParameters
) -> None:
"""Print the setup of the experiment."""
sort_indices = np.argsort(true_params.frequencies)
print("--- Experiment Setup ---")
print(f"Sampling Frequency: {config.fs} Hz")
print(f"Signal Duration: {config.duration * 1000:.0f} ms")
print(f"True Frequencies: {true_params.frequencies[sort_indices]} Hz")
print(f"True Amplitudes: {true_params.amplitudes[sort_indices]}")
print(f"True Phases: {true_params.phases[sort_indices]} rad")
print(f"SNR: {config.snr_db} dB")
print(f"Number of Grid Points: {config.n_grids}")
def print_results(
true_params: SinusoidParameters, est_params: SinusoidParameters
) -> None:
"""Print the results."""
print("\n--- Estimation Results ---")
print(f"Est Frequencies: {est_params.frequencies} Hz")
print(f"Est Amplitudes: {est_params.amplitudes}")
print(f"Est Phases: {est_params.phases} rad")
sort_indices = np.argsort(true_params.frequencies)
freq_errors = est_params.frequencies - true_params.frequencies[sort_indices]
amp_errors = est_params.amplitudes - true_params.amplitudes[sort_indices]
phase_errors = est_params.phases - true_params.phases[sort_indices]
print("\n--- Estimation Errors ---")
print(f"Freq Errors: {freq_errors} Hz")
print(f"Amp Errors: {amp_errors}")
print(f"Phase Errors: {phase_errors} rad\n")
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments for MUSIC demo."""
parser = argparse.ArgumentParser(
description="Parameter estimation demo using MUSIC algorithm."
)
parser.add_argument(
"--fs",
type=float,
default=44100.0,
help="Sampling frequency in Hz (default: 44100.0)",
)
parser.add_argument(
"--duration",
type=float,
default=0.1,
help="Signal duration in seconds (default: 0.1)",
)
parser.add_argument(
"--snr_db",
type=float,
default=30.0,
help="Signal-to-noise ratio in dB (default: 30.0)",
)
parser.add_argument(
"--freqs_true",
type=float,
nargs="+",
default=[440.0, 460.0, 480.0],
help="List of true frequencies in Hz (space separated). "
+ "Default: 440.0 460.0 480.0",
)
parser.add_argument(
"--amp_range",
type=float,
nargs=2,
default=[0.2, 1.2],
metavar=("AMP_MIN", "AMP_MAX"),
help="Amplitude range for sinusoid generation (default: 0.2 1.2)",
)
parser.add_argument(
"--n_grids",
type=int,
default=2048,
help="Number of frequency grid points for MUSIC spectrum (default: 2048)",
)
return parser.parse_args()
def main() -> None:
"""Perform demonstration."""
args = parse_args()
config = ExperimentConfig(
fs=args.fs,
duration=args.duration,
snr_db=args.snr_db,
freqs_true=np.array(args.freqs_true, dtype=np.float64),
amp_range=tuple(args.amp_range),
n_grids=args.n_grids,
)
amps_true, phases_true = generate_amps_phases(config.amp_range, config.n_sinusoids)
true_params = SinusoidParameters(config.freqs_true, amps_true, phases_true)
noisy_signal = generate_test_signal(
config.fs, config.duration, config.snr_db, true_params
)
print_experiment_setup(config, true_params)
print("--- Running MUSIC ---")
signal_complex = noisy_signal.astype(np.complex128)
est_freqs = estimate_frequencies_music(
signal_complex, args.fs, config.n_sinusoids, args.n_grids
)
if len(est_freqs) == config.n_sinusoids:
est_amps, est_phases = estimate_amplitudes_phases(
signal_complex, args.fs, est_freqs
)
est_params = SinusoidParameters(est_freqs, est_amps, est_phases)
print_results(true_params, est_params)
else:
print(f"Estimated (MUSIC): {est_freqs}")
print(
f"Warning: Only found {len(est_freqs)} peaks, "
+ f"but expected {config.n_sinusoids}."
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment