Skip to content

Instantly share code, notes, and snippets.

@kastnerkyle
Last active September 21, 2019 07:34
Show Gist options
  • Save kastnerkyle/a3661d6be10a0ae9e01fd4299f0a38af to your computer and use it in GitHub Desktop.
Save kastnerkyle/a3661d6be10a0ae9e01fd4299f0a38af to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# Authors: jfsantos, Kyle Kastner
import sys
import numpy as np
import wave
import scipy as sp
from scipy.io import wavfile
def segment_axis(a, length, overlap=0, axis=None, end='cut', endvalue=0):
"""Generate a new array that chops the given array along the given axis
into overlapping frames.
Originally from scikits.talkbox, author Anne Archibald
example:
>>> segment_axis(arange(10), 4, 2)
array([[0, 1, 2, 3],
[2, 3, 4, 5],
[4, 5, 6, 7],
[6, 7, 8, 9]])
arguments:
a The array to segment
length The length of each frame
overlap The number of array elements by which the frames should overlap
axis The axis to operate on; if None, act on the flattened array
end What to do with the last frame, if the array is not evenly
divisible into pieces. Options are:
'cut' Simply discard the extra values
'wrap' Copy values from the beginning of the array
'pad' Pad with a constant value
endvalue The value to use for end='pad'
The array is not copied unless necessary (either because it is unevenly
strided and being flattened or because end is set to 'pad' or 'wrap').
"""
if axis is None:
a = np.ravel(a) # may copy
axis = 0
l = a.shape[axis]
if overlap >= length:
raise ValueError("frames cannot overlap by more than 100%")
if overlap < 0 or length <= 0:
raise ValueError("overlap must be nonnegative and length must "\
"be positive")
if l < length or (l-length) % (length-overlap):
if l>length:
roundup = length + (1+(l-length)//(length-overlap))*(length-overlap)
rounddown = length + ((l-length)//(length-overlap))*(length-overlap)
else:
roundup = length
rounddown = 0
assert rounddown < l < roundup
assert roundup == rounddown + (length-overlap) \
or (roundup == length and rounddown == 0)
a = a.swapaxes(-1,axis)
if end == 'cut':
a = a[..., :rounddown]
elif end in ['pad','wrap']: # copying will be necessary
s = list(a.shape)
s[-1] = roundup
b = np.empty(s,dtype=a.dtype)
if end in ['pad','wrap']:
b[..., :l] = a
if end == 'pad':
b[..., l:] = endvalue
elif end == 'wrap':
b[..., l:] = a[..., :roundup-l]
a = b
elif end == 'delay':
s = list(a.shape)
l_orig = l
l += overlap
# if l not divisible by length, pad last frame with zeros
if l_orig % (length-overlap):
roundup = length + (1+(l-length)//(length-overlap))*(length-overlap)
else:
roundup = l
s[-1] = roundup
b = np.empty(s,dtype=a.dtype)
b[..., :(overlap)] = endvalue
b[..., (overlap):(l_orig+overlap)] = a
b[..., (l_orig+overlap):] = endvalue
a = b
else:
raise ValueError("end has to be either 'cut', 'pad', 'wrap', or 'delay'.")
a = a.swapaxes(-1,axis)
l = a.shape[axis]
if l == 0:
raise ValueError("Not enough data points to segment array in 'cut' mode; "\
"try 'pad' or 'wrap'")
assert l >= length
assert (l-length) % (length-overlap) == 0
n = 1 + (l-length) // (length-overlap)
s = a.strides[axis]
newshape = a.shape[:axis] + (n,length) + a.shape[axis+1:]
newstrides = a.strides[:axis] + ((length-overlap)*s,s) + a.strides[axis+1:]
try:
return np.ndarray.__new__(np.ndarray, strides=newstrides,
shape=newshape, buffer=a, dtype=a.dtype)
except TypeError:
warnings.warn("Problem with ndarray creation forces copy.")
a = a.copy()
# Shape doesn't change but strides does
newstrides = a.strides[:axis] + ((length-overlap)*s,s) \
+ a.strides[axis+1:]
return np.ndarray.__new__(np.ndarray, strides=newstrides,
shape=newshape, buffer=a, dtype=a.dtype)
def simple_energy_vad(x, fs, framelen=0.02, theta_main=30, theta_min=-55):
'''Simple energy voice activity detection algorithm based on energy
thresholds as described in Tomi Kinnunen and Padmanabhan Rajan, "A
practical, self-adaptive voice activity detector for speaker verification
with noisy telephone and microphone data", ICASSP 2013, Vancouver (NOTE:
this is the benchmark method, not the method proposed by the authors).
By Joao Felipe Santos, as implemented in SRMRPy
'''
# Split signal in frames
framelen = int(framelen * fs)
frames = segment_axis(x, length=framelen, overlap=0, end='pad')
frames_zero_mean = frames - frames.mean(axis=0)
frame_energy = 10*np.log10(1/(framelen-1) * (frames_zero_mean**2).sum(axis=1) + 1e-6)
max_energy = max(frame_energy)
speech_presence = (frame_energy > max_energy - theta_main) & (frame_energy > theta_min)
x_vad = np.zeros_like(x, dtype=bool)
for idx, frame in enumerate(frames):
if speech_presence[idx]:
x_vad[idx*framelen:(idx+1)*framelen] = True
else:
x_vad[idx*framelen:(idx+1)*framelen] = False
return x[x_vad], x_vad
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def laglead_energy_vad(x, fs, small_win_size=0.25, large_win_size=1.,
up_threshold=1E-4, down_threshold=-1E-3):
'''Simple energy voice activity detection algorithm using lag-lead windows
'''
sz = x.itemsize
shp = x.shape
if len(shp) != 1:
raise ValueError("Input should be a 1D array")
small_win_samples = int(small_win_size * fs)
large_win_samples = int(large_win_size * fs)
small_overlap = rolling_window(np.abs(x), small_win_samples)
small_mean = np.mean(small_overlap, axis=-1)
large_overlap = rolling_window(np.abs(x), large_win_samples)
large_mean = np.mean(large_overlap, axis=-1)
lead_diff = large_mean - small_mean[:len(large_mean)]
lag_diff = large_mean - small_mean[-len(large_mean):]
ups = np.arange(len(lead_diff))[lead_diff > up_threshold]
downs = np.arange(len(lag_diff))[lag_diff < down_threshold] + (len(small_mean) - len(large_mean))
vad = []
is_up = False
ups = list(ups)
downs = list(downs)
for i in range(len(x)):
if is_up:
removing = True
while removing:
if len(ups) == 0:
break
if ups[0] < i:
ups.pop(0)
else:
removing = False
if len(downs) == 0:
vad.append(True)
continue
if i < downs[0]:
vad.append(True)
else:
vad.append(False)
is_up = True
else:
removing = True
while removing:
if len(downs) == 0:
break
if downs[0] < i:
downs.pop(0)
else:
removing = False
if len(ups) == 0:
vad.append(False)
continue
if i < ups[0]:
vad.append(False)
else:
vad.append(True)
is_up = True
vad = np.array(vad)
return x[vad], vad
def read_signal(filename, winsize):
wf=wave.open(filename,'rb')
n=wf.getnframes()
str=wf.readframes(n)
params = ((wf.getnchannels(), wf.getsampwidth(),
wf.getframerate(), wf.getnframes(),
wf.getcomptype(), wf.getcompname()))
siglen=((int )(len(str)/2/winsize) + 1) * winsize
signal=sp.zeros(siglen, sp.int16)
signal[0:len(str)/2] = sp.fromstring(str,sp.int16)
return [signal, params]
def get_frame(signal, winsize, no):
shift=winsize/2
start=no*shift
end = start+winsize
return signal[start:end]
# from jfsantos
class LTSD():
def __init__(self,winsize,window,order):
self.winsize = int(winsize)
self.window = window
self.order = order
self.amplitude = {}
def get_amplitude(self,signal,l):
if self.amplitude.has_key(l):
return self.amplitude[l]
else:
amp = sp.absolute(sp.fft(get_frame(signal, self.winsize,l) * self.window))
self.amplitude[l] = amp
return amp
def compute_noise_avg_spectrum(self, nsignal):
windownum = int(len(nsignal)//(self.winsize//2) - 1)
avgamp = np.zeros(self.winsize)
for l in range(windownum):
avgamp += sp.absolute(sp.fft(get_frame(nsignal, self.winsize,l) * self.window))
return avgamp/float(windownum)
def compute(self,signal):
self.windownum = int(len(signal)//(self.winsize//2) - 1)
ltsds = np.zeros(self.windownum)
#Calculate the average noise spectrum amplitude based 20 frames in the head parts of input signal.
self.avgnoise = self.compute_noise_avg_spectrum(signal[0:self.winsize*20])**2
for l in xrange(self.windownum):
ltsds[l] = self.ltsd(signal,l,5)
return ltsds
def ltse(self,signal,l,order):
maxamp = np.zeros(self.winsize)
for idx in range(l-order,l+order+1):
amp = self.get_amplitude(signal,idx)
maxamp = np.maximum(maxamp,amp)
return maxamp
def ltsd(self,signal,l,order):
if l < order or l+order >= self.windownum:
return 0
return 10.0*np.log10(np.sum(self.ltse(signal,l,order)**2/self.avgnoise)/float(len(self.avgnoise)))
def ltsd_vad(x, fs, threshold=9, winsize=8192):
window = sp.hanning(winsize)
ltsd = LTSD(winsize, window, 5)
s_vad = ltsd.compute(x)
# LTSD is 50% overlap, so each "step" covers 4096 samples
# +1 to cover the extra edge window
n_samples = int(((len(s_vad) + 1) * winsize) // 2)
time_s = n_samples / float(fs)
time_points = np.linspace(0, time_s, len(s_vad))
time_samples = (fs * time_points).astype(np.int32)
time_samples = time_samples
f_vad = np.zeros_like(x, dtype=np.bool)
offset = winsize
for n, (ss, es) in enumerate(zip(time_samples[:-1], time_samples[1:])):
sss = ss - offset
if sss < 0:
sss = 0
ses = es - offset
if ses < 0:
ses = 0
if s_vad[n + 1] < threshold:
f_vad[sss:ses] = False
else:
f_vad[sss:ses] = True
f_vad[ses:] = False
return x[f_vad], f_vad
if __name__=="__main__":
WINSIZE=8192
sound=sys.argv[1]
#signal, params = read_signal(sound,WINSIZE)
fs, d = wavfile.read(sound)
res, vad = ltsd_vad(d, fs, threshold=9)
"""
window = sp.hanning(WINSIZE)
ltsd = LTSD(WINSIZE, window, 5)
res = ltsd.compute(signal)
"""
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(res)
plt.savefig('res.png')
wavfile.write("out.wav", fs, res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment