|
#!/usr/bin/env python |
|
# -*- coding: utf-8 -*- |
|
|
|
""" |
|
Computes the spectrogram of a test signal using Theano and conv1d. |
|
|
|
Author: Jan Schlüter |
|
""" |
|
|
|
import sys |
|
import os |
|
import timeit |
|
|
|
import numpy as np |
|
import theano |
|
import theano.tensor as T |
|
|
|
from testfile import make_test_signal |
|
|
|
INPUT_ON_GPU = True |
|
OUTPUT_ON_GPU = True |
|
|
|
|
|
def compile_spectrogram_fn(sample_rate=22050, frame_len=1024, fps=70): |
|
""" |
|
Compiles a Theano function for computing a magnitude spectrogram at a given |
|
sample rate (in Hz), frame length (in samples) and frame rate (in Hz). |
|
""" |
|
if INPUT_ON_GPU: |
|
global x |
|
signal = theano.shared(x.astype(np.float32), name='signal') |
|
else: |
|
signal = T.vector('signal') |
|
win = np.hanning(frame_len) |
|
hopsize = sample_rate // fps |
|
bins = frame_len // 2 + 1 |
|
# create DFT matrix (separated real and imaginary parts) |
|
t = np.arange(frame_len, dtype=np.float64) |
|
k = np.arange(bins, dtype=np.float64) |
|
W_real = np.cos(t / frame_len * 2 * np.pi * k[:, np.newaxis]) |
|
W_imag = -np.sin(t / frame_len * 2 * np.pi * k[:, np.newaxis]) |
|
# interleave parts and multiply by window |
|
#W = np.concatenate((W_real, W_imag), axis=0) # concatenation |
|
W = np.empty((2 * bins, frame_len)) |
|
W[::2] = W_real |
|
W[1::2] = W_imag |
|
W = W * win |
|
# define Theano expression for STFT using strided convolution |
|
W = T.constant(W[:, np.newaxis].astype(np.float32), name='W') |
|
try: |
|
conv1d = T.nnet.abstract_conv.AbstractConv( |
|
convdim=1, imshp=(1, 1, None), kshp=(1, 2 * bins, frame_len), |
|
subsample=hopsize) |
|
spect = conv1d(signal.dimshuffle('x', 'x', 0), W) |
|
except Exception: |
|
spect = T.nnet.conv2d( |
|
signal.dimshuffle('x', 'x', 'x', 0), |
|
W.dimshuffle(0, 1, 'x', 2), |
|
input_shape=(1, 1, 1, None), |
|
filter_shape=(1, 2 * bins, 1, frame_len), |
|
subsample=(1, hopsize), |
|
# much slower shape: |
|
#signal.dimshuffle('x', 'x', 0, 'x'), |
|
#W.dimshuffle(0, 1, 2, 'x'), |
|
#input_shape=(1, 1, None, 1), |
|
#filter_shape=(1, 2 * bins, frame_len, 1), |
|
#subsample=(hopsize, 1), |
|
) |
|
# convert into magnitude spectrogram |
|
spect = T.square(spect) |
|
spect = T.sqrt(spect[:, ::2] + spect[:, 1::2]) |
|
# compile function |
|
if OUTPUT_ON_GPU: |
|
spect = theano.gpuarray.as_gpuarray_variable(spect, None) |
|
if INPUT_ON_GPU: |
|
return theano.function([], spect) |
|
else: |
|
return theano.function([signal], spect) |
|
|
|
|
|
def main(): |
|
# load input |
|
global x, spectrogram |
|
x = make_test_signal() |
|
spectrogram = compile_spectrogram_fn() |
|
|
|
# benchmark |
|
times = timeit.repeat( |
|
setup='from __main__ import x, spectrogram', |
|
stmt='spectrogram(%s)%s' % ( |
|
'x' if not INPUT_ON_GPU else '', |
|
'.sync()' if OUTPUT_ON_GPU else ''), |
|
repeat=5, number=32) |
|
print("Took %.3fs." % (min(times) / 32)) |
|
|
|
# save result |
|
#np.save(sys.argv[0][:-2] + 'npy', |
|
# np.squeeze(spectrogram(x) if not INPUT_ON_GPU else spectrogram()).T) |
|
|
|
|
|
if __name__=="__main__": |
|
main() |