Forked from kastnerkyle/run_ltsd_silence_removal.py
Created
September 21, 2019 07:34
-
-
Save vyraun/f59bf6009c4f82907f0cd979d217cfa6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Authors: jfsantos, Kyle Kastner | |
import sys | |
import numpy as np | |
import wave | |
import scipy as sp | |
from scipy.io import wavfile | |
def segment_axis(a, length, overlap=0, axis=None, end='cut', endvalue=0): | |
"""Generate a new array that chops the given array along the given axis | |
into overlapping frames. | |
Originally from scikits.talkbox, author Anne Archibald | |
example: | |
>>> segment_axis(arange(10), 4, 2) | |
array([[0, 1, 2, 3], | |
[2, 3, 4, 5], | |
[4, 5, 6, 7], | |
[6, 7, 8, 9]]) | |
arguments: | |
a The array to segment | |
length The length of each frame | |
overlap The number of array elements by which the frames should overlap | |
axis The axis to operate on; if None, act on the flattened array | |
end What to do with the last frame, if the array is not evenly | |
divisible into pieces. Options are: | |
'cut' Simply discard the extra values | |
'wrap' Copy values from the beginning of the array | |
'pad' Pad with a constant value | |
endvalue The value to use for end='pad' | |
The array is not copied unless necessary (either because it is unevenly | |
strided and being flattened or because end is set to 'pad' or 'wrap'). | |
""" | |
if axis is None: | |
a = np.ravel(a) # may copy | |
axis = 0 | |
l = a.shape[axis] | |
if overlap >= length: | |
raise ValueError("frames cannot overlap by more than 100%") | |
if overlap < 0 or length <= 0: | |
raise ValueError("overlap must be nonnegative and length must "\ | |
"be positive") | |
if l < length or (l-length) % (length-overlap): | |
if l>length: | |
roundup = length + (1+(l-length)//(length-overlap))*(length-overlap) | |
rounddown = length + ((l-length)//(length-overlap))*(length-overlap) | |
else: | |
roundup = length | |
rounddown = 0 | |
assert rounddown < l < roundup | |
assert roundup == rounddown + (length-overlap) \ | |
or (roundup == length and rounddown == 0) | |
a = a.swapaxes(-1,axis) | |
if end == 'cut': | |
a = a[..., :rounddown] | |
elif end in ['pad','wrap']: # copying will be necessary | |
s = list(a.shape) | |
s[-1] = roundup | |
b = np.empty(s,dtype=a.dtype) | |
if end in ['pad','wrap']: | |
b[..., :l] = a | |
if end == 'pad': | |
b[..., l:] = endvalue | |
elif end == 'wrap': | |
b[..., l:] = a[..., :roundup-l] | |
a = b | |
elif end == 'delay': | |
s = list(a.shape) | |
l_orig = l | |
l += overlap | |
# if l not divisible by length, pad last frame with zeros | |
if l_orig % (length-overlap): | |
roundup = length + (1+(l-length)//(length-overlap))*(length-overlap) | |
else: | |
roundup = l | |
s[-1] = roundup | |
b = np.empty(s,dtype=a.dtype) | |
b[..., :(overlap)] = endvalue | |
b[..., (overlap):(l_orig+overlap)] = a | |
b[..., (l_orig+overlap):] = endvalue | |
a = b | |
else: | |
raise ValueError("end has to be either 'cut', 'pad', 'wrap', or 'delay'.") | |
a = a.swapaxes(-1,axis) | |
l = a.shape[axis] | |
if l == 0: | |
raise ValueError("Not enough data points to segment array in 'cut' mode; "\ | |
"try 'pad' or 'wrap'") | |
assert l >= length | |
assert (l-length) % (length-overlap) == 0 | |
n = 1 + (l-length) // (length-overlap) | |
s = a.strides[axis] | |
newshape = a.shape[:axis] + (n,length) + a.shape[axis+1:] | |
newstrides = a.strides[:axis] + ((length-overlap)*s,s) + a.strides[axis+1:] | |
try: | |
return np.ndarray.__new__(np.ndarray, strides=newstrides, | |
shape=newshape, buffer=a, dtype=a.dtype) | |
except TypeError: | |
warnings.warn("Problem with ndarray creation forces copy.") | |
a = a.copy() | |
# Shape doesn't change but strides does | |
newstrides = a.strides[:axis] + ((length-overlap)*s,s) \ | |
+ a.strides[axis+1:] | |
return np.ndarray.__new__(np.ndarray, strides=newstrides, | |
shape=newshape, buffer=a, dtype=a.dtype) | |
def simple_energy_vad(x, fs, framelen=0.02, theta_main=30, theta_min=-55): | |
'''Simple energy voice activity detection algorithm based on energy | |
thresholds as described in Tomi Kinnunen and Padmanabhan Rajan, "A | |
practical, self-adaptive voice activity detector for speaker verification | |
with noisy telephone and microphone data", ICASSP 2013, Vancouver (NOTE: | |
this is the benchmark method, not the method proposed by the authors). | |
By Joao Felipe Santos, as implemented in SRMRPy | |
''' | |
# Split signal in frames | |
framelen = int(framelen * fs) | |
frames = segment_axis(x, length=framelen, overlap=0, end='pad') | |
frames_zero_mean = frames - frames.mean(axis=0) | |
frame_energy = 10*np.log10(1/(framelen-1) * (frames_zero_mean**2).sum(axis=1) + 1e-6) | |
max_energy = max(frame_energy) | |
speech_presence = (frame_energy > max_energy - theta_main) & (frame_energy > theta_min) | |
x_vad = np.zeros_like(x, dtype=bool) | |
for idx, frame in enumerate(frames): | |
if speech_presence[idx]: | |
x_vad[idx*framelen:(idx+1)*framelen] = True | |
else: | |
x_vad[idx*framelen:(idx+1)*framelen] = False | |
return x[x_vad], x_vad | |
def rolling_window(a, window): | |
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window) | |
strides = a.strides + (a.strides[-1],) | |
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) | |
def laglead_energy_vad(x, fs, small_win_size=0.25, large_win_size=1., | |
up_threshold=1E-4, down_threshold=-1E-3): | |
'''Simple energy voice activity detection algorithm using lag-lead windows | |
''' | |
sz = x.itemsize | |
shp = x.shape | |
if len(shp) != 1: | |
raise ValueError("Input should be a 1D array") | |
small_win_samples = int(small_win_size * fs) | |
large_win_samples = int(large_win_size * fs) | |
small_overlap = rolling_window(np.abs(x), small_win_samples) | |
small_mean = np.mean(small_overlap, axis=-1) | |
large_overlap = rolling_window(np.abs(x), large_win_samples) | |
large_mean = np.mean(large_overlap, axis=-1) | |
lead_diff = large_mean - small_mean[:len(large_mean)] | |
lag_diff = large_mean - small_mean[-len(large_mean):] | |
ups = np.arange(len(lead_diff))[lead_diff > up_threshold] | |
downs = np.arange(len(lag_diff))[lag_diff < down_threshold] + (len(small_mean) - len(large_mean)) | |
vad = [] | |
is_up = False | |
ups = list(ups) | |
downs = list(downs) | |
for i in range(len(x)): | |
if is_up: | |
removing = True | |
while removing: | |
if len(ups) == 0: | |
break | |
if ups[0] < i: | |
ups.pop(0) | |
else: | |
removing = False | |
if len(downs) == 0: | |
vad.append(True) | |
continue | |
if i < downs[0]: | |
vad.append(True) | |
else: | |
vad.append(False) | |
is_up = True | |
else: | |
removing = True | |
while removing: | |
if len(downs) == 0: | |
break | |
if downs[0] < i: | |
downs.pop(0) | |
else: | |
removing = False | |
if len(ups) == 0: | |
vad.append(False) | |
continue | |
if i < ups[0]: | |
vad.append(False) | |
else: | |
vad.append(True) | |
is_up = True | |
vad = np.array(vad) | |
return x[vad], vad | |
def read_signal(filename, winsize): | |
wf=wave.open(filename,'rb') | |
n=wf.getnframes() | |
str=wf.readframes(n) | |
params = ((wf.getnchannels(), wf.getsampwidth(), | |
wf.getframerate(), wf.getnframes(), | |
wf.getcomptype(), wf.getcompname())) | |
siglen=((int )(len(str)/2/winsize) + 1) * winsize | |
signal=sp.zeros(siglen, sp.int16) | |
signal[0:len(str)/2] = sp.fromstring(str,sp.int16) | |
return [signal, params] | |
def get_frame(signal, winsize, no): | |
shift=winsize/2 | |
start=no*shift | |
end = start+winsize | |
return signal[start:end] | |
# from jfsantos | |
class LTSD(): | |
def __init__(self,winsize,window,order): | |
self.winsize = int(winsize) | |
self.window = window | |
self.order = order | |
self.amplitude = {} | |
def get_amplitude(self,signal,l): | |
if self.amplitude.has_key(l): | |
return self.amplitude[l] | |
else: | |
amp = sp.absolute(sp.fft(get_frame(signal, self.winsize,l) * self.window)) | |
self.amplitude[l] = amp | |
return amp | |
def compute_noise_avg_spectrum(self, nsignal): | |
windownum = int(len(nsignal)//(self.winsize//2) - 1) | |
avgamp = np.zeros(self.winsize) | |
for l in range(windownum): | |
avgamp += sp.absolute(sp.fft(get_frame(nsignal, self.winsize,l) * self.window)) | |
return avgamp/float(windownum) | |
def compute(self,signal): | |
self.windownum = int(len(signal)//(self.winsize//2) - 1) | |
ltsds = np.zeros(self.windownum) | |
#Calculate the average noise spectrum amplitude based 20 frames in the head parts of input signal. | |
self.avgnoise = self.compute_noise_avg_spectrum(signal[0:self.winsize*20])**2 | |
for l in xrange(self.windownum): | |
ltsds[l] = self.ltsd(signal,l,5) | |
return ltsds | |
def ltse(self,signal,l,order): | |
maxamp = np.zeros(self.winsize) | |
for idx in range(l-order,l+order+1): | |
amp = self.get_amplitude(signal,idx) | |
maxamp = np.maximum(maxamp,amp) | |
return maxamp | |
def ltsd(self,signal,l,order): | |
if l < order or l+order >= self.windownum: | |
return 0 | |
return 10.0*np.log10(np.sum(self.ltse(signal,l,order)**2/self.avgnoise)/float(len(self.avgnoise))) | |
def ltsd_vad(x, fs, threshold=9, winsize=8192): | |
window = sp.hanning(winsize) | |
ltsd = LTSD(winsize, window, 5) | |
s_vad = ltsd.compute(x) | |
# LTSD is 50% overlap, so each "step" covers 4096 samples | |
# +1 to cover the extra edge window | |
n_samples = int(((len(s_vad) + 1) * winsize) // 2) | |
time_s = n_samples / float(fs) | |
time_points = np.linspace(0, time_s, len(s_vad)) | |
time_samples = (fs * time_points).astype(np.int32) | |
time_samples = time_samples | |
f_vad = np.zeros_like(x, dtype=np.bool) | |
offset = winsize | |
for n, (ss, es) in enumerate(zip(time_samples[:-1], time_samples[1:])): | |
sss = ss - offset | |
if sss < 0: | |
sss = 0 | |
ses = es - offset | |
if ses < 0: | |
ses = 0 | |
if s_vad[n + 1] < threshold: | |
f_vad[sss:ses] = False | |
else: | |
f_vad[sss:ses] = True | |
f_vad[ses:] = False | |
return x[f_vad], f_vad | |
if __name__=="__main__": | |
WINSIZE=8192 | |
sound=sys.argv[1] | |
#signal, params = read_signal(sound,WINSIZE) | |
fs, d = wavfile.read(sound) | |
res, vad = ltsd_vad(d, fs, threshold=9) | |
""" | |
window = sp.hanning(WINSIZE) | |
ltsd = LTSD(WINSIZE, window, 5) | |
res = ltsd.compute(signal) | |
""" | |
import matplotlib | |
matplotlib.use('Agg') | |
import matplotlib.pyplot as plt | |
fig = plt.figure() | |
ax = fig.add_subplot(111) | |
ax.plot(res) | |
plt.savefig('res.png') | |
wavfile.write("out.wav", fs, res) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment