Created
October 8, 2023 21:54
-
-
Save jaggzh/5c345fc072ddf5a6199ba19ac6112001 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from audioproc import * | |
from utils import * | |
import matplotlib.pyplot as plt | |
import matplotlib.animation as animation | |
def plot_find_peaks(energy, *, num_peaks, winsize): | |
""" | |
Plot energy signal, | |
overlay with energy wins, | |
mark candidate points with vlines. | |
energy: 1d numpy array of energies | |
num_peaks: Int, num peaks to find | |
winsize: Int, win sizes for energy bundles | |
""" | |
en_len = len(energy) | |
spacing = en_len // (num_peaks) | |
cands = np.arange(spacing, en_len - spacing, spacing) | |
fig, ax = plt.subplots() | |
ax.set_ylim(0, np.max(energy) + 0.1) | |
# Building initial list of energy windows | |
wins_start = np.arange(0, en_len - winsize, winsize) | |
en_wins = [] | |
for start in wins_start: | |
end = min(start + winsize, en_len) | |
en_wins.append({'st':start, 'en':end, 'e':energy[start:end], 'done':False}) | |
# Sorting energy windows by energy (desc) and creating index list | |
en_wins_idx_srt = sorted(range(len(en_wins)), key=lambda x: np.sum(en_wins[x]['e']), reverse=True) | |
# Remaining candidates | |
rem_cands = cands.copy() | |
# Final lists | |
en_wins_final = [] | |
final_cands = [] | |
eaten_cands = [] | |
def update(frame, ani): | |
nonlocal en_wins_idx_srt, rem_cands, en_wins_final, final_cands, eaten_cands, cands | |
ax.clear() | |
ax.plot(energy, label='Energy') | |
# Displaying energy windows | |
for win in en_wins: | |
if not win['done']: | |
#ax.fill_between(np.arange(win['st'], win['en']), win['e'], color='brown', alpha=(win['e'].max() / energy.max()) * 0.5) | |
# ax.fill_between(np.arange(win['st'], win['en']), win['e'], color='brown', alpha=np.clip((win['e'].max() / energy.max()) ** .5, 0, 1)) | |
ax.fill_between(np.arange(win['st'], win['en']), win['e'], | |
color='brown', | |
alpha=np.clip(np.sqrt(win['e'].max() / energy.max()), 0.1, 1)) | |
# Displaying candidate points | |
for cand in cands: | |
ax.axvline(x=cand, color='cornflowerblue', linewidth=2, alpha=0.5) | |
for cand in eaten_cands: | |
ax.axvline(x=cand, color='silver', linewidth=2, alpha=0.5) | |
# Logic for finding nearest candidate and updating plot | |
if frame < len(en_wins_idx_srt): | |
win_idx = en_wins_idx_srt[frame] | |
win = en_wins[win_idx] | |
if not win['done']: | |
for cand in rem_cands: | |
if win['st'] - spacing//2 <= cand <= win['en'] + spacing//2: | |
# Find the coordinate of the energy peak within the window | |
peak_coord = np.argmax(win['e']) + win['st'] | |
# Add to final lists and remove from original lists | |
en_wins_final.append(win) | |
final_cands.append(peak_coord) | |
rem_cands = rem_cands[rem_cands != cand] | |
eaten_cands.append(cand) | |
cands = cands[cands != cand] | |
# Marking the energy window as done | |
win['done'] = True | |
# Marking neighbor windows as done | |
if win_idx>0: en_wins[win_idx-1]['done'] = True | |
if win_idx<len(en_wins)-1: en_wins[win_idx+1]['done'] = True | |
# Mark windows as done if adjacent candidate is done | |
# Function to mark all windows between two coordinates as done | |
def mark_windows_done(start, end): | |
for w in en_wins: | |
if start <= w['st'] <= end or start <= w['en'] <= end: | |
w['done'] = True | |
# Check both left and right from the peak_coord | |
for direction in [-1, 1]: | |
check_coord = peak_coord + direction | |
# Continue until final_cand or rem_cand or go out of bounds | |
while check_coord >= 0 and check_coord < en_len: | |
if check_coord in final_cands: | |
# If find a final_cand, mark all windows | |
# between as done | |
mark_windows_done(min(check_coord, peak_coord), max(check_coord, peak_coord)) | |
break | |
elif check_coord in rem_cands: | |
# If we find a rem_cand, do not mark | |
break | |
check_coord += direction | |
break | |
else: | |
# Stop the animation when it's no longer processing anything | |
ani.event_source.stop() | |
# Displaying final candidate points | |
for final_cand in final_cands: | |
ax.axvline(x=final_cand, color='darkolivegreen', linewidth=3) | |
ax.set_title('Energy Peaks Plot') | |
ax.set_xlabel('Time') | |
ax.set_ylabel('Energy') | |
ax.legend() | |
ani = animation.FuncAnimation(fig, update, frames=len(en_wins_idx_srt), fargs=(None,), interval=10, repeat=False) | |
plt.show() | |
def nothing(): | |
pass | |
if __name__ == "__main__": | |
file=sys.argv[1] | |
au, osr = read_audio(file) | |
#au = au[:osr*880] # crop | |
sr=2000 | |
pf("Downsampling") | |
au = downsample_audio(au, sr=osr, tsr=sr) | |
pf("Calc energy") | |
energy = calculate_energy(au, sr*2, bludgeon=.2) # 2s windows. 'scale' energies | |
# energy = np.random.rand(1000) # Example energy array | |
# Parameters | |
num_peaks = 20 | |
winsize = sr*2 | |
pf("Finding peaks") | |
plot_find_peaks(energy, num_peaks=num_peaks, winsize=winsize) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment