Created
August 25, 2025 03:21
-
-
Save tam17aki/52fb8be15af3e819d781a13d8e544573 to your computer and use it in GitHub Desktop.
A demonstration of parameter estimation for sinusoidal signals using the MUSIC algorithm.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""A demonstration of parameter estimation for sinusoidal signals. | |
Frequencies and damping coefficients are estimated using the MUSIC algorithm, | |
followed by amplitude and phase estimation via the least squares method. | |
Copyright (C) 2025 by Akira TAMAMORI | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
import argparse | |
import warnings | |
from dataclasses import dataclass | |
from typing import Self, final | |
import numpy as np | |
import numpy.typing as npt | |
from scipy.linalg import eigh, hankel, pinv | |
from scipy.signal import find_peaks | |
@dataclass(frozen=True) | |
class SinusoidParameters: | |
"""A class to store the parameters of multiple sinusoids.""" | |
frequencies: npt.NDArray[np.float64] | |
amplitudes: npt.NDArray[np.float64] | |
phases: npt.NDArray[np.float64] | |
@dataclass(frozen=True) | |
class ExperimentConfig: | |
"""A class to store the configuration for a signal processing experiment.""" | |
fs: float | |
duration: float | |
snr_db: float | |
freqs_true: npt.NDArray[np.float64] | |
amp_range: tuple[float, float] | |
n_grids: int | |
@property | |
def n_sinusoids(self) -> int: | |
"""Return the number of sinusoids.""" | |
return self.freqs_true.size | |
@final | |
class MusicAnalyzer: | |
"""A class to perform MUSIC-based parameter estimation.""" | |
def __init__(self, fs: float, n_sinusoids: int, n_grids: int = 4192): | |
"""Initialize the analyzer with an experiment configuration. | |
Args: | |
fs (float): Sampling frequency in Hz. | |
n_sinusoids (int): Number of sinusoids. | |
n_grids (int, optional): Number of grid points for MUSIC algorithm. | |
""" | |
self.fs = fs | |
self.n_sinusoids = n_sinusoids | |
self.n_grids = n_grids | |
self.est_params: SinusoidParameters | None = None | |
def fit(self, signal: npt.NDArray[np.complex128]) -> Self: | |
"""Run the full parameter estimation process. | |
Args: | |
signal (np.ndarray): Input signal (complex128). | |
Returns: | |
self (object): The fitted analyzer. | |
""" | |
# Estimate frequencies and decay rates using the MUSIC algorithm. | |
freqs = self._estimate_frequencies(signal) | |
if freqs.size != self.n_sinusoids: | |
self.est_params = SinusoidParameters(freqs, np.array([]), np.array([])) | |
warnings.warn( | |
f"Expected {self.n_sinusoids} components, but found {freqs.size}." | |
) | |
return self | |
# Estimate amplitudes and phases via the least squares method. | |
amps, phases = self._estimate_amplitudes_phases(signal, freqs) | |
self.est_params = SinusoidParameters(freqs, amps, phases) | |
return self | |
@property | |
def frequencies(self) -> npt.NDArray[np.float64]: | |
"""Return the estimated frequencies in Hz after fitting.""" | |
if self.est_params is None: | |
raise AttributeError("Cannot access 'frequencies' before running fit().") | |
return self.est_params.frequencies | |
@property | |
def amplitudes(self) -> npt.NDArray[np.float64]: | |
"""Return the estimated amplitudes after fitting.""" | |
if self.est_params is None or self.est_params.amplitudes.size == 0: | |
raise AttributeError( | |
"Cannot access 'amplitudes' before fitting is complete." | |
) | |
return self.est_params.amplitudes | |
@property | |
def phases(self) -> npt.NDArray[np.float64]: | |
"""Return the estimated phases in radians after fitting.""" | |
if self.est_params is None or self.est_params.phases.size == 0: | |
raise AttributeError("Cannot access 'phases' before fitting is complete.") | |
return self.est_params.phases | |
@staticmethod | |
def _build_covariance_matrix( | |
signal: npt.NDArray[np.complex128], subspace_dim: int | |
) -> npt.NDArray[np.complex128]: | |
"""Build the covariance matrix from the input signal.""" | |
n_samples = signal.size | |
n_snapshots = n_samples - subspace_dim + 1 | |
hankel_matrix = hankel(signal[:subspace_dim], signal[subspace_dim - 1 :]) | |
_cov_matrix = (hankel_matrix @ hankel_matrix.conj().T) / n_snapshots | |
cov_matrix: npt.NDArray[np.complex128] = _cov_matrix.astype(np.complex128) | |
return cov_matrix | |
def _estimate_noise_subspace( | |
self, signal: npt.NDArray[np.complex128], subspace_dim: int, model_order: int | |
) -> npt.NDArray[np.complex128] | None: | |
"""Estimate the signal subspace using eigenvalue decomposition.""" | |
# 1. Build the covariance matrix | |
cov_matrix = self._build_covariance_matrix(signal, subspace_dim) | |
# 2. Eigenvalue decomposition | |
try: | |
_, eigenvectors = eigh(cov_matrix) | |
except np.linalg.LinAlgError: | |
warnings.warn("Eigenvalue decomposition on covariance matrix failed.") | |
return None | |
# The noise subspace is the set of vectors corresponding to the smaller | |
# eigenvalues. | |
# Since it is in ascending order, select (subspace_dim - model_order) vectors | |
# from the beginning | |
n_noise_vectors = subspace_dim - model_order | |
_subspace = eigenvectors[:, :n_noise_vectors] | |
noise_subspace: npt.NDArray[np.complex128] = _subspace.astype(np.complex128) | |
return noise_subspace | |
def _calculate_music_spectrum( | |
self, | |
subspace_dim: int, | |
noise_subspace: npt.NDArray[np.complex128], | |
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: | |
"""Calculate the MUSIC pseudospectrum over a frequency grid.""" | |
freq_grid: npt.NDArray[np.float64] = np.linspace( | |
0, self.fs / 2, num=self.n_grids, dtype=np.float64 | |
) | |
music_spectrum = np.zeros(freq_grid.size) | |
# G*G^H only needs to be calculated once | |
projector_onto_noise = noise_subspace @ noise_subspace.conj().T | |
for i, f in enumerate(freq_grid): | |
omega = 2 * np.pi * f / self.fs | |
# Calculate a steering vector a(ω) | |
steering_vector = np.exp(-1j * omega * np.arange(subspace_dim)) | |
# Calculate the denominator a^H * (G*G^H) * a | |
steering_vector_h = steering_vector.conj() | |
denominator = steering_vector_h @ projector_onto_noise @ steering_vector | |
# Add a small value to avoid division by zero | |
music_spectrum[i] = 1 / (np.abs(denominator) + 1e-12) | |
return freq_grid, music_spectrum | |
def _find_music_peaks( | |
self, | |
freq_grid: npt.NDArray[np.float64], | |
music_spectrum: npt.NDArray[np.float64], | |
) -> npt.NDArray[np.float64]: | |
"""Find the N strongest peaks from the MUSIC spectrum.""" | |
# 1. Find all "local maxima" as peak candidates without using prominence. | |
# Ignores extremely small noise floor fluctuations. | |
all_peaks, _ = find_peaks(music_spectrum, height=np.mean(music_spectrum)) | |
all_peaks = np.array(all_peaks, dtype=np.int64) | |
if all_peaks.size < self.n_sinusoids: | |
return freq_grid[all_peaks] if all_peaks.size > 0 else np.array([]) | |
# 2. From all the peak candidates found, select N peaks | |
# with the highest spectral values. | |
strongest_peak_indices = all_peaks[ | |
np.argsort(music_spectrum[all_peaks])[-self.n_sinusoids :] | |
] | |
estimated_freqs = freq_grid[strongest_peak_indices] | |
return np.sort(estimated_freqs) | |
def _estimate_frequencies( | |
self, signal: npt.NDArray[np.complex128] | |
) -> npt.NDArray[np.float64]: | |
"""Estimate frequencies of multiple sinusoids. | |
Args: | |
signal (np.ndarray): | |
Input signal (complex128). | |
Returns: | |
np.ndarray: Estimated frequencies in Hz (float64). | |
Returns empty arrays if estimation fails. | |
""" | |
n_samples = signal.size | |
subspace_dim = n_samples // 3 | |
model_order = 2 * self.n_sinusoids | |
if subspace_dim <= model_order: | |
warnings.warn( | |
"Invalid subspace dimension for MUSIC. Returning empty result." | |
) | |
return np.array([]) | |
# 1. Estimate the noise subspace | |
noise_subspace = self._estimate_noise_subspace( | |
signal, subspace_dim, model_order | |
) | |
if noise_subspace is None: | |
warnings.warn("Failed to estimate noise subspace. Returning empty result.") | |
return np.array([]) | |
# 2. Calculate the MUSIC spectrum | |
freq_grid, music_spectrum = self._calculate_music_spectrum( | |
subspace_dim, noise_subspace | |
) | |
# 3. Detecting peaks from a spectrum | |
estimated_freqs = self._find_music_peaks(freq_grid, music_spectrum) | |
return estimated_freqs | |
def _estimate_amplitudes_phases( | |
self, | |
signal: npt.NDArray[np.complex128], | |
estimated_freqs: npt.NDArray[np.float64], | |
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: | |
"""Estimate amplitudes and phases from frequencies using least squares. | |
Args: | |
signal (np.ndarray): Input signal (complex128). | |
estimated_freqs (np.ndarray): Array of estimated frequencies in Hz. | |
Returns: | |
tuple[np.ndarray, np.ndarray]: | |
- estimated_amps (np.ndarray): Estimated amplitudes. | |
- estimated_phases (np.ndarray): Estimated phases in radians. | |
""" | |
n_samples = signal.size | |
n_sinusoids = estimated_freqs.size | |
# 1. Build the Vandermonde matrix V | |
t = np.arange(n_samples) / self.fs | |
vandermonde_matrix = np.zeros((n_samples, n_sinusoids), dtype=np.complex128) | |
for i, freq in enumerate(estimated_freqs): | |
vandermonde_matrix[:, i] = np.exp(2j * np.pi * freq * t) | |
# 2. Solve for complex amplitudes c using pseudo-inverse | |
# y = V @ c => c = pinv(V) @ y | |
try: | |
complex_amps = pinv(vandermonde_matrix) @ signal | |
except np.linalg.LinAlgError: | |
warnings.warn("Least squares estimation for amplitudes/phases failed.") | |
return np.array([]), np.array([]) | |
# 3. Extract amplitudes and phases | |
# For a real-valued sinusoid A*cos(2*pi*f*t + phi), the complex amplitude | |
# estimated using only the positive frequency is (A/2)*exp(j*phi). | |
# Therefore, we need to multiply the magnitude by 2. | |
estimated_amps = 2 * np.abs(complex_amps).astype(np.float64) | |
estimated_phases = np.angle(complex_amps).astype(np.float64) | |
# Sort results according to frequency for consistent comparison | |
sort_indices = np.argsort(estimated_freqs) | |
return estimated_amps[sort_indices], estimated_phases[sort_indices] | |
def _generate_amps_phases( | |
amp_range: tuple[float, float], | |
n_sinusoids: int, | |
rng: np.random.Generator | None = None, | |
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: | |
"""Generate amplitudes and phases for multiple sinusoids. | |
Args: | |
amp_range (tuple of float64): Lower and upper bound for amplitude. | |
n_sinusoids (int): Number of sinusoids. | |
rng (np.random.Generator, optional): Random generator. | |
Returns: | |
tuple[np.ndarray, np.ndarray]: | |
- amps (np.ndarray of float64): Random amplitudes assigned to each sinusoid. | |
- phases (np.ndarray of float64): Random phases assigned to each sinus | |
""" | |
if rng is None: | |
rng = np.random.default_rng() | |
amps = rng.uniform(amp_range[0], amp_range[1], n_sinusoids).astype(np.float64) | |
phases = rng.uniform(-np.pi, np.pi, n_sinusoids).astype(np.float64) | |
return amps, phases | |
def create_true_parameters( | |
config: ExperimentConfig, rng: np.random.Generator | None = None | |
) -> SinusoidParameters: | |
"""Create a SinusoidParameters object with true values from the config. | |
Args: | |
config (ExperimentConfig): Configuration of the experiment. | |
rng (np.random.Generator, optional): Random generator. | |
Returns: | |
SinusoidParameters: Parameters of multiple sinusoids. | |
""" | |
# Generate the random parts of the parameters | |
amps_true, phases_true = _generate_amps_phases( | |
config.amp_range, config.n_sinusoids, rng | |
) | |
# Combine fixed parts (from config) and random parts | |
true_params = SinusoidParameters( | |
frequencies=config.freqs_true, amplitudes=amps_true, phases=phases_true | |
) | |
return true_params | |
def synthesize_sinusoids( | |
fs: float, duration: float, params: SinusoidParameters | |
) -> npt.NDArray[np.float64]: | |
"""Generate a clean signal from multiple sinusoids. | |
Args: | |
fs (float): Sampling frequency in Hz. | |
duration (float): Signal duration in seconds. | |
params (SinusoidParameters): Parametes of mutiple sinusoids. | |
Returns: | |
clean_signal (np.ndarray): Sum of multiple sinusoids (float64). | |
""" | |
t = np.linspace(0, duration, int(fs * duration), endpoint=False) | |
clean_signal = np.zeros(t.size, dtype=np.float64) | |
for f, a, p in zip(params.frequencies, params.amplitudes, params.phases): | |
clean_signal += a * np.cos(2 * np.pi * f * t + p) | |
return clean_signal | |
def add_awgn( | |
signal: npt.NDArray[np.float64], | |
snr_db: float, | |
rng: np.random.Generator | None = None, | |
) -> npt.NDArray[np.float64]: | |
"""Add Additive White Gaussian Noise (AWGN) to a given signal. | |
Args: | |
signal (np.ndarray): Input clean signal. | |
snr_db (float): Target signal-to-noise ratio in dB. | |
rng (np.random.Generator, optional): Random generator. | |
Returns: | |
np.ndarray: Noisy signal with specified SNR. | |
""" | |
if rng is None: | |
rng = np.random.default_rng() | |
signal_power = np.var(signal) | |
noise_power = signal_power / (10 ** (snr_db / 10)) | |
noise = rng.normal(0.0, np.sqrt(noise_power), signal.size) | |
return signal + noise | |
def generate_test_signal( | |
fs: float, duration: float, snr_db: float, params: SinusoidParameters | |
) -> npt.NDArray[np.float64]: | |
"""Generate a noisy test signal consisting of multiple sinusoids and AWGN. | |
Args: | |
fs (float): Sampling frequency in Hz. | |
duration (float): Signal duration in seconds. | |
snr_db (float): Target signal-to-noise ratio in dB. | |
params (SinusoidParameters): Parametes of mutiple sinusoids. | |
Returns: | |
noisy_signal (np.ndarray): Generated test signal (float64). | |
""" | |
clean_signal = synthesize_sinusoids(fs, duration, params) | |
noisy_signal = add_awgn(clean_signal, snr_db) | |
return noisy_signal | |
def print_experiment_setup( | |
config: ExperimentConfig, true_params: SinusoidParameters | |
) -> None: | |
"""Print the setup of the experiment.""" | |
sort_indices = np.argsort(true_params.frequencies) | |
print("--- Experiment Setup ---") | |
print(f"Sampling Frequency: {config.fs} Hz") | |
print(f"Signal Duration: {config.duration * 1000:.0f} ms") | |
print(f"True Frequencies: {true_params.frequencies[sort_indices]} Hz") | |
print(f"True Amplitudes: {true_params.amplitudes[sort_indices]}") | |
print(f"True Phases: {true_params.phases[sort_indices]} rad") | |
print(f"SNR: {config.snr_db} dB") | |
print(f"Number of Grid Points: {config.n_grids}") | |
def print_results( | |
analyzer: MusicAnalyzer, | |
true_params: SinusoidParameters, | |
) -> None: | |
"""Print the results.""" | |
if analyzer.est_params is None: | |
print("MusicAnalyzer is not fitted.") | |
return | |
if analyzer.frequencies.size != true_params.frequencies.size: | |
print( | |
"Estimation incomplete or failed. " | |
+ f"Found {analyzer.frequencies.size} components." | |
) | |
print(f"Est Frequencies: {analyzer.frequencies} Hz") | |
return | |
print("\n--- Estimation Results ---") | |
print(f"Est Frequencies: {analyzer.frequencies} Hz") | |
print(f"Est Amplitudes: {analyzer.amplitudes}") | |
print(f"Est Phases: {analyzer.phases} rad") | |
sort_indices = np.argsort(true_params.frequencies) | |
freq_errors = analyzer.frequencies - true_params.frequencies[sort_indices] | |
amp_errors = analyzer.amplitudes - true_params.amplitudes[sort_indices] | |
phase_errors = analyzer.phases - true_params.phases[sort_indices] | |
print("\n--- Estimation Errors ---") | |
print(f"Freq Errors: {freq_errors} Hz") | |
print(f"Amp Errors: {amp_errors}") | |
print(f"Phase Errors: {phase_errors} rad\n") | |
def parse_args() -> argparse.Namespace: | |
"""Parse command-line arguments for MUSIC demo.""" | |
parser = argparse.ArgumentParser( | |
description="Parameter estimation demo using MUSIC algorithm." | |
) | |
parser.add_argument( | |
"--fs", | |
type=float, | |
default=44100.0, | |
help="Sampling frequency in Hz (default: 44100.0)", | |
) | |
parser.add_argument( | |
"--duration", | |
type=float, | |
default=0.1, | |
help="Signal duration in seconds (default: 0.1)", | |
) | |
parser.add_argument( | |
"--snr_db", | |
type=float, | |
default=30.0, | |
help="Signal-to-noise ratio in dB (default: 30.0)", | |
) | |
parser.add_argument( | |
"--freqs_true", | |
type=float, | |
nargs="+", | |
default=[440.0, 460.0, 480.0], | |
help="List of true frequencies in Hz (space separated). " | |
+ "Default: 440.0 460.0 480.0", | |
) | |
parser.add_argument( | |
"--amp_range", | |
type=float, | |
nargs=2, | |
default=[0.2, 1.2], | |
metavar=("AMP_MIN", "AMP_MAX"), | |
help="Amplitude range for sinusoid generation (default: 0.2 1.2)", | |
) | |
parser.add_argument( | |
"--n_grids", | |
type=int, | |
default=4192, | |
help="Number of frequency grid points for MUSIC spectrum (default: 4192)", | |
) | |
return parser.parse_args() | |
def main() -> None: | |
"""Perform demonstration.""" | |
args = parse_args() | |
config = ExperimentConfig( | |
fs=args.fs, | |
duration=args.duration, | |
snr_db=args.snr_db, | |
freqs_true=np.array(args.freqs_true, dtype=np.float64), | |
amp_range=tuple(args.amp_range), | |
n_grids=args.n_grids, | |
) | |
# Generate test signals (sum of multiple sinusoids with additive noise) | |
true_params = create_true_parameters(config) | |
noisy_signal = generate_test_signal( | |
config.fs, config.duration, config.snr_db, true_params | |
) | |
# Print the experiment setup | |
print_experiment_setup(config, true_params) | |
# Perform parameter estimation | |
print("\n--- Running MUSIC ---") | |
analyzer = MusicAnalyzer(config.fs, config.n_sinusoids, config.n_grids) | |
analyzer.fit(noisy_signal.astype(np.complex128)) | |
# Print results | |
print_results(analyzer, true_params) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment