Skip to content

Instantly share code, notes, and snippets.

@jaggzh
Created October 8, 2023 22:01
Show Gist options
  • Save jaggzh/0c6f9bbfbb9935ba8eef70d8c06bfc5b to your computer and use it in GitHub Desktop.
Save jaggzh/0c6f9bbfbb9935ba8eef70d8c06bfc5b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# gist-paste -u https://gist.github.com/jaggzh/5c345fc072ddf5a6199ba19ac6112001
import numpy as np
import matplotlib.pyplot as plt
from audioproc import *
from utils import *
import matplotlib.pyplot as plt
import matplotlib.animation as animation
def plot_find_peaks(energy, *, num_peaks, winsize):
"""
Plot energy signal,
overlay with energy wins,
mark candidate points with vlines.
energy: 1d numpy array of energies
num_peaks: Int, num peaks to find
winsize: Int, win sizes for energy bundles
"""
en_len = len(energy)
spacing = en_len // (num_peaks)
cands = np.arange(spacing, en_len - spacing, spacing)
fig, ax = plt.subplots()
ax.set_ylim(0, np.max(energy) + 0.1)
# Building initial list of energy windows
wins_start = np.arange(0, en_len - winsize, winsize)
en_wins = []
for start in wins_start:
end = min(start + winsize, en_len)
en_wins.append({'st':start, 'en':end, 'e':energy[start:end], 'done':False})
# Sorting energy windows by energy (desc) and creating index list
en_wins_idx_srt = sorted(range(len(en_wins)), key=lambda x: np.sum(en_wins[x]['e']), reverse=True)
# Remaining candidates
rem_cands = cands.copy()
# Final lists
en_wins_final = []
final_cands = []
eaten_cands = []
def update(frame, ani):
nonlocal en_wins_idx_srt, rem_cands, en_wins_final, final_cands, eaten_cands, cands
ax.clear()
ax.plot(energy, label='Energy')
# Displaying energy windows
for win in en_wins:
if not win['done']:
#ax.fill_between(np.arange(win['st'], win['en']), win['e'], color='brown', alpha=(win['e'].max() / energy.max()) * 0.5)
# ax.fill_between(np.arange(win['st'], win['en']), win['e'], color='brown', alpha=np.clip((win['e'].max() / energy.max()) ** .5, 0, 1))
ax.fill_between(np.arange(win['st'], win['en']), win['e'],
color='brown',
alpha=np.clip(np.sqrt(win['e'].max() / energy.max()), 0.1, 1))
# Displaying candidate points
for cand in cands:
ax.axvline(x=cand, color='cornflowerblue', linewidth=2, alpha=0.5)
for cand in eaten_cands:
ax.axvline(x=cand, color='silver', linewidth=2, alpha=0.5)
# Logic for finding nearest candidate and updating plot
if frame < len(en_wins_idx_srt):
win_idx = en_wins_idx_srt[frame]
win = en_wins[win_idx]
if not win['done']:
for cand in rem_cands:
if win['st'] - spacing//2 <= cand <= win['en'] + spacing//2:
# Find the coordinate of the energy peak within the window
peak_coord = np.argmax(win['e']) + win['st']
# Add to final lists and remove from original lists
en_wins_final.append(win)
final_cands.append(peak_coord)
rem_cands = rem_cands[rem_cands != cand]
eaten_cands.append(cand)
cands = cands[cands != cand]
# Marking the energy window as done
win['done'] = True
# Marking neighbor windows as done
if win_idx>0: en_wins[win_idx-1]['done'] = True
if win_idx<len(en_wins)-1: en_wins[win_idx+1]['done'] = True
# Mark windows as done if adjacent candidate is done
# Function to mark all windows between two coordinates as done
def mark_windows_done(start, end):
for w in en_wins:
if start <= w['st'] <= end or start <= w['en'] <= end:
w['done'] = True
# Check both left and right from the peak_coord
for direction in [-1, 1]:
check_coord = peak_coord + direction
# Continue until final_cand or rem_cand or go out of bounds
while check_coord >= 0 and check_coord < en_len:
if check_coord in final_cands:
# If find a final_cand, mark all windows
# between as done
mark_windows_done(min(check_coord, peak_coord), max(check_coord, peak_coord))
break
elif check_coord in rem_cands:
# If we find a rem_cand, do not mark
break
check_coord += direction
break
else:
# Stop the animation when it's no longer processing anything
ani.event_source.stop()
# Displaying final candidate points
for final_cand in final_cands:
ax.axvline(x=final_cand, color='darkolivegreen', linewidth=3)
ax.set_title('Energy Peaks Plot')
ax.set_xlabel('Time')
ax.set_ylabel('Energy')
ax.legend()
ani = animation.FuncAnimation(fig, update, frames=len(en_wins_idx_srt), fargs=(None,), interval=10, repeat=False)
plt.show()
def nothing():
pass
if __name__ == "__main__":
file=sys.argv[1]
au, osr = read_audio(file)
#au = au[:osr*880] # crop
sr=2000
pf("Downsampling")
au = downsample_audio(au, sr=osr, tsr=sr)
pf("Calc energy")
energy = calculate_energy(au, sr*2, bludgeon=.2) # 2s windows. 'scale' energies
# energy = np.random.rand(1000) # Example energy array
# Parameters
num_peaks = 20
winsize = sr*2
pf("Finding peaks")
plot_find_peaks(energy, num_peaks=num_peaks, winsize=winsize)
#!/usr/bin/env python3
# gist-paste -u https://gist.github.com/jaggzh/876165693ba548ef6eedb9df8efabd3f
import numpy as np
import matplotlib.pyplot as plt
import sys
from audioproc import *
def find_peaks(energy, *, num_peaks, winsize):
en_len = len(energy)
spacing = en_len // num_peaks
cands = np.arange(spacing, en_len - spacing, spacing)
# Building initial list of energy windows
wins_start = np.arange(0, en_len - winsize, winsize)
en_wins = [{'st': start, 'en': min(start + winsize, en_len), 'e': energy[start:start + winsize], 'done': False}
for start in wins_start]
# Sorting energy windows by energy (desc) and creating index list
en_wins_idx_srt = sorted(range(len(en_wins)), key=lambda x: np.sum(en_wins[x]['e']), reverse=True)
# Remaining candidates
rem_cands = cands.copy()
# Final lists
final_cands = []
for win_idx in en_wins_idx_srt:
win = en_wins[win_idx]
if not win['done']:
for cand in rem_cands:
if win['st'] - spacing//2 <= cand <= win['en'] + spacing//2:
# Find the coordinate of the energy peak within the window
peak_coord = np.argmax(win['e']) + win['st']
# Add to final lists and remove from original lists
final_cands.append(peak_coord)
rem_cands = rem_cands[rem_cands != cand]
# Marking the energy window as done
win['done'] = True
# Function to mark all windows between two coordinates as done
def mark_windows_done(start, end):
for w in en_wins:
if start <= w['st'] <= end or start <= w['en'] <= end:
w['done'] = True
# Check both left and right from the peak_coord
for direction in [-1, 1]:
check_coord = peak_coord + direction
# Continue until final_cand or rem_cand or go out of bounds
while 0 <= check_coord < en_len:
if check_coord in final_cands:
mark_windows_done(min(check_coord, peak_coord), max(check_coord, peak_coord))
break
elif check_coord in rem_cands:
break
check_coord += direction
break
return final_cands
def plot_find_peaks(energy, num_peaks, winsize):
peaks = find_peaks(energy, num_peaks=num_peaks, winsize=winsize)
plt.plot(energy, label='Energy')
plt.vlines(peaks, ymin=0, ymax=np.max(energy), colors='darkblue', label='Peaks')
plt.legend()
plt.xlabel('Time')
plt.ylabel('Energy')
plt.title('Energy and Detected Peaks')
plt.show()
if __name__ == "__main__":
file=sys.argv[1]
au, osr = read_audio(file)
au = au[:osr*480] # crop
sr=2000
pf("Downsampling")
au = downsample_audio(au, sr=osr, tsr=sr)
pf("Calc energy")
energy = calculate_energy(au, sr*2, bludgeon=.1) # 2s windows
# energy = np.random.rand(1000) # Example energy array
# Parameters
num_peaks = 20
winsize = sr*2
pf("Finding peaks")
plot_find_peaks(energy, num_peaks=num_peaks, winsize=winsize)
soundfile
fastdtw
scipy
import inspect
import sys
def lineno(): return inspect.currentframe().f_back.f_lineno
def pf(*x,**y): print(*x, **y, flush=True)
def pfp(*x,**y): print(*x, **y, flush=True, sep='')
def pfpl(*x,**y): print(*x, **y, flush=True, sep='', end='')
def pe(*x,**y): print(*x, **y, file=sys.stderr)
# vim: et
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment