Created
October 8, 2023 22:01
-
-
Save jaggzh/0c6f9bbfbb9935ba8eef70d8c06bfc5b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# gist-paste -u https://gist.github.com/jaggzh/5c345fc072ddf5a6199ba19ac6112001 | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from audioproc import * | |
from utils import * | |
import matplotlib.pyplot as plt | |
import matplotlib.animation as animation | |
def plot_find_peaks(energy, *, num_peaks, winsize): | |
""" | |
Plot energy signal, | |
overlay with energy wins, | |
mark candidate points with vlines. | |
energy: 1d numpy array of energies | |
num_peaks: Int, num peaks to find | |
winsize: Int, win sizes for energy bundles | |
""" | |
en_len = len(energy) | |
spacing = en_len // (num_peaks) | |
cands = np.arange(spacing, en_len - spacing, spacing) | |
fig, ax = plt.subplots() | |
ax.set_ylim(0, np.max(energy) + 0.1) | |
# Building initial list of energy windows | |
wins_start = np.arange(0, en_len - winsize, winsize) | |
en_wins = [] | |
for start in wins_start: | |
end = min(start + winsize, en_len) | |
en_wins.append({'st':start, 'en':end, 'e':energy[start:end], 'done':False}) | |
# Sorting energy windows by energy (desc) and creating index list | |
en_wins_idx_srt = sorted(range(len(en_wins)), key=lambda x: np.sum(en_wins[x]['e']), reverse=True) | |
# Remaining candidates | |
rem_cands = cands.copy() | |
# Final lists | |
en_wins_final = [] | |
final_cands = [] | |
eaten_cands = [] | |
def update(frame, ani): | |
nonlocal en_wins_idx_srt, rem_cands, en_wins_final, final_cands, eaten_cands, cands | |
ax.clear() | |
ax.plot(energy, label='Energy') | |
# Displaying energy windows | |
for win in en_wins: | |
if not win['done']: | |
#ax.fill_between(np.arange(win['st'], win['en']), win['e'], color='brown', alpha=(win['e'].max() / energy.max()) * 0.5) | |
# ax.fill_between(np.arange(win['st'], win['en']), win['e'], color='brown', alpha=np.clip((win['e'].max() / energy.max()) ** .5, 0, 1)) | |
ax.fill_between(np.arange(win['st'], win['en']), win['e'], | |
color='brown', | |
alpha=np.clip(np.sqrt(win['e'].max() / energy.max()), 0.1, 1)) | |
# Displaying candidate points | |
for cand in cands: | |
ax.axvline(x=cand, color='cornflowerblue', linewidth=2, alpha=0.5) | |
for cand in eaten_cands: | |
ax.axvline(x=cand, color='silver', linewidth=2, alpha=0.5) | |
# Logic for finding nearest candidate and updating plot | |
if frame < len(en_wins_idx_srt): | |
win_idx = en_wins_idx_srt[frame] | |
win = en_wins[win_idx] | |
if not win['done']: | |
for cand in rem_cands: | |
if win['st'] - spacing//2 <= cand <= win['en'] + spacing//2: | |
# Find the coordinate of the energy peak within the window | |
peak_coord = np.argmax(win['e']) + win['st'] | |
# Add to final lists and remove from original lists | |
en_wins_final.append(win) | |
final_cands.append(peak_coord) | |
rem_cands = rem_cands[rem_cands != cand] | |
eaten_cands.append(cand) | |
cands = cands[cands != cand] | |
# Marking the energy window as done | |
win['done'] = True | |
# Marking neighbor windows as done | |
if win_idx>0: en_wins[win_idx-1]['done'] = True | |
if win_idx<len(en_wins)-1: en_wins[win_idx+1]['done'] = True | |
# Mark windows as done if adjacent candidate is done | |
# Function to mark all windows between two coordinates as done | |
def mark_windows_done(start, end): | |
for w in en_wins: | |
if start <= w['st'] <= end or start <= w['en'] <= end: | |
w['done'] = True | |
# Check both left and right from the peak_coord | |
for direction in [-1, 1]: | |
check_coord = peak_coord + direction | |
# Continue until final_cand or rem_cand or go out of bounds | |
while check_coord >= 0 and check_coord < en_len: | |
if check_coord in final_cands: | |
# If find a final_cand, mark all windows | |
# between as done | |
mark_windows_done(min(check_coord, peak_coord), max(check_coord, peak_coord)) | |
break | |
elif check_coord in rem_cands: | |
# If we find a rem_cand, do not mark | |
break | |
check_coord += direction | |
break | |
else: | |
# Stop the animation when it's no longer processing anything | |
ani.event_source.stop() | |
# Displaying final candidate points | |
for final_cand in final_cands: | |
ax.axvline(x=final_cand, color='darkolivegreen', linewidth=3) | |
ax.set_title('Energy Peaks Plot') | |
ax.set_xlabel('Time') | |
ax.set_ylabel('Energy') | |
ax.legend() | |
ani = animation.FuncAnimation(fig, update, frames=len(en_wins_idx_srt), fargs=(None,), interval=10, repeat=False) | |
plt.show() | |
def nothing(): | |
pass | |
if __name__ == "__main__": | |
file=sys.argv[1] | |
au, osr = read_audio(file) | |
#au = au[:osr*880] # crop | |
sr=2000 | |
pf("Downsampling") | |
au = downsample_audio(au, sr=osr, tsr=sr) | |
pf("Calc energy") | |
energy = calculate_energy(au, sr*2, bludgeon=.2) # 2s windows. 'scale' energies | |
# energy = np.random.rand(1000) # Example energy array | |
# Parameters | |
num_peaks = 20 | |
winsize = sr*2 | |
pf("Finding peaks") | |
plot_find_peaks(energy, num_peaks=num_peaks, winsize=winsize) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# gist-paste -u https://gist.github.com/jaggzh/876165693ba548ef6eedb9df8efabd3f | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import sys | |
from audioproc import * | |
def find_peaks(energy, *, num_peaks, winsize): | |
en_len = len(energy) | |
spacing = en_len // num_peaks | |
cands = np.arange(spacing, en_len - spacing, spacing) | |
# Building initial list of energy windows | |
wins_start = np.arange(0, en_len - winsize, winsize) | |
en_wins = [{'st': start, 'en': min(start + winsize, en_len), 'e': energy[start:start + winsize], 'done': False} | |
for start in wins_start] | |
# Sorting energy windows by energy (desc) and creating index list | |
en_wins_idx_srt = sorted(range(len(en_wins)), key=lambda x: np.sum(en_wins[x]['e']), reverse=True) | |
# Remaining candidates | |
rem_cands = cands.copy() | |
# Final lists | |
final_cands = [] | |
for win_idx in en_wins_idx_srt: | |
win = en_wins[win_idx] | |
if not win['done']: | |
for cand in rem_cands: | |
if win['st'] - spacing//2 <= cand <= win['en'] + spacing//2: | |
# Find the coordinate of the energy peak within the window | |
peak_coord = np.argmax(win['e']) + win['st'] | |
# Add to final lists and remove from original lists | |
final_cands.append(peak_coord) | |
rem_cands = rem_cands[rem_cands != cand] | |
# Marking the energy window as done | |
win['done'] = True | |
# Function to mark all windows between two coordinates as done | |
def mark_windows_done(start, end): | |
for w in en_wins: | |
if start <= w['st'] <= end or start <= w['en'] <= end: | |
w['done'] = True | |
# Check both left and right from the peak_coord | |
for direction in [-1, 1]: | |
check_coord = peak_coord + direction | |
# Continue until final_cand or rem_cand or go out of bounds | |
while 0 <= check_coord < en_len: | |
if check_coord in final_cands: | |
mark_windows_done(min(check_coord, peak_coord), max(check_coord, peak_coord)) | |
break | |
elif check_coord in rem_cands: | |
break | |
check_coord += direction | |
break | |
return final_cands | |
def plot_find_peaks(energy, num_peaks, winsize): | |
peaks = find_peaks(energy, num_peaks=num_peaks, winsize=winsize) | |
plt.plot(energy, label='Energy') | |
plt.vlines(peaks, ymin=0, ymax=np.max(energy), colors='darkblue', label='Peaks') | |
plt.legend() | |
plt.xlabel('Time') | |
plt.ylabel('Energy') | |
plt.title('Energy and Detected Peaks') | |
plt.show() | |
if __name__ == "__main__": | |
file=sys.argv[1] | |
au, osr = read_audio(file) | |
au = au[:osr*480] # crop | |
sr=2000 | |
pf("Downsampling") | |
au = downsample_audio(au, sr=osr, tsr=sr) | |
pf("Calc energy") | |
energy = calculate_energy(au, sr*2, bludgeon=.1) # 2s windows | |
# energy = np.random.rand(1000) # Example energy array | |
# Parameters | |
num_peaks = 20 | |
winsize = sr*2 | |
pf("Finding peaks") | |
plot_find_peaks(energy, num_peaks=num_peaks, winsize=winsize) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
soundfile | |
fastdtw | |
scipy |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
import sys | |
def lineno(): return inspect.currentframe().f_back.f_lineno | |
def pf(*x,**y): print(*x, **y, flush=True) | |
def pfp(*x,**y): print(*x, **y, flush=True, sep='') | |
def pfpl(*x,**y): print(*x, **y, flush=True, sep='', end='') | |
def pe(*x,**y): print(*x, **y, file=sys.stderr) | |
# vim: et |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment