Skip to content

Instantly share code, notes, and snippets.

@f0k
Last active January 15, 2023 22:32
Show Gist options
  • Save f0k/0e50729c169114fdc6cb41be013a499f to your computer and use it in GitHub Desktop.
Save f0k/0e50729c169114fdc6cb41be013a499f to your computer and use it in GitHub Desktop.
STFT Benchmarks on CPU and GPU in Python

STFT Benchmarks on CPU and GPU in Python

Currently implemented:

  • numpy (pyfftw recommended: install libfftw3-dev and pip install pyfftw)
  • Theano (one version using conv1d, one using cuFFT)
  • cupy (using cuFFT, requires bleeding-edge version from github)

Results for a 2-minute test signal (without CPU/GPU transfer):

  • numpy: 0.044s
  • theano_conv: 0.008s
  • theano_fft: 0.002s
  • cupy: 0.002s

Feel free to reuse any implementation if including the LICENSE.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Computes the spectrogram of a test signal using cupy and cuFFT.
Author: Jan Schlüter
"""
import sys
import os
import timeit
import numpy as np
import cupy as cp
INPUT_ON_GPU = True
OUTPUT_ON_GPU = True
from testfile import make_test_signal
def spectrogram(signal, sample_rate=22050, frame_len=1024, fps=70):
"""
Computes a magnitude spectrogram at a given sample rate (in Hz), frame
length (in samples) and frame rate (in Hz), on CUDA using cupy.
"""
if not INPUT_ON_GPU:
signal = cp.array(signal.astype(np.float32)) # already blown up to a list of frames
win = cp.hanning(frame_len).astype(cp.float32)
# apply window function
#signal *= win # this doesn't work correctly for some reason.
signal = signal * win
# perform FFT
spect = cp.fft.rfft(signal)
# convert into magnitude spectrogram
spect = cp.abs(spect)
# return
if OUTPUT_ON_GPU:
cp.cuda.get_current_stream().synchronize()
else:
return spect.get()
def main():
# load input
global x, spectrogram
x = make_test_signal()
# we do the following here because cupy cannot do stride tricks
# the actual copying work is included in the benchmark unless INPUT_ON_GPU
hop_size = 22050 // 70
frame_len = 1024
frames = len(x) - frame_len + 1
x = np.lib.stride_tricks.as_strided(
x, (frames, frame_len), (x.strides[0], x.strides[0]))[::hop_size]
if INPUT_ON_GPU:
x = cp.array(x.astype(np.float32))
# benchmark
times = timeit.repeat(
setup='from __main__ import x, spectrogram',
stmt='spectrogram(x)',
repeat=5, number=32)
print("Took %.3fs." % (min(times) / 32))
# save result
#assert not OUTPUT_ON_GPU
#np.save(sys.argv[0][:-2] + 'npy', spectrogram(x))
if __name__=="__main__":
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Computes the spectrogram of a test signal using numpy and fftw.
Author: Jan Schlüter
"""
import sys
import os
import timeit
import numpy as np
try:
from pyfftw.builders import rfft as rfft_builder
except ImportError:
def rfft_builder(*args, **kwargs):
return np.fft.rfft
from testfile import make_test_signal
INPUT_AS_FLOAT = False
def spectrogram(samples, sample_rate=22050, frame_len=1024, fps=70, batch=50):
"""
Computes a magnitude spectrogram for a given vector of samples at a given
sample rate (in Hz), frame length (in samples) and frame rate (in Hz).
Allows to transform multiple frames at once for improved performance (with
a default value of 50, more is not always better). Returns a numpy array.
"""
if len(samples) < frame_len:
return np.empty((0, frame_len // 2 + 1), dtype=samples.dtype)
win = np.hanning(frame_len).astype(samples.dtype)
hopsize = sample_rate // fps
num_frames = max(0, (len(samples) - frame_len) // hopsize + 1)
batch = min(batch, num_frames)
if batch <= 1 or not samples.flags.c_contiguous:
rfft = rfft_builder(samples[:frame_len], n=frame_len)
spect = np.vstack(np.abs(rfft(samples[pos:pos + frame_len] * win))
for pos in range(0, len(samples) - frame_len + 1,
int(hopsize)))
else:
rfft = rfft_builder(np.empty((batch, frame_len), samples.dtype),
n=frame_len, threads=1)
frames = np.lib.stride_tricks.as_strided(
samples, shape=(num_frames, frame_len),
strides=(samples.strides[0] * hopsize, samples.strides[0]))
spect = [np.abs(rfft(frames[pos:pos + batch] * win))
for pos in range(0, num_frames - batch + 1, batch)]
if num_frames % batch:
spect.append(spectrogram(
samples[(num_frames // batch * batch) * hopsize:],
sample_rate, frame_len, fps, batch=1))
spect = np.vstack(spect)
return spect
def main():
# load input
global x
x = make_test_signal()
if INPUT_AS_FLOAT:
x = x.astype(np.float32)
# benchmark
times = timeit.repeat(
setup='from __main__ import x, spectrogram, np',
stmt='spectrogram(np.asarray(x, np.float32))',
repeat=5, number=32)
print("Took %.3fs." % (min(times) / 32))
# save result
#np.save(sys.argv[0][:-2] + 'npy', spectrogram(x.astype(np.float32)))
if __name__=="__main__":
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Computes the spectrogram of a test signal using Theano and conv1d.
Author: Jan Schlüter
"""
import sys
import os
import timeit
import numpy as np
import theano
import theano.tensor as T
from testfile import make_test_signal
INPUT_ON_GPU = True
OUTPUT_ON_GPU = True
def compile_spectrogram_fn(sample_rate=22050, frame_len=1024, fps=70):
"""
Compiles a Theano function for computing a magnitude spectrogram at a given
sample rate (in Hz), frame length (in samples) and frame rate (in Hz).
"""
if INPUT_ON_GPU:
global x
signal = theano.shared(x.astype(np.float32), name='signal')
else:
signal = T.vector('signal')
win = np.hanning(frame_len)
hopsize = sample_rate // fps
bins = frame_len // 2 + 1
# create DFT matrix (separated real and imaginary parts)
t = np.arange(frame_len, dtype=np.float64)
k = np.arange(bins, dtype=np.float64)
W_real = np.cos(t / frame_len * 2 * np.pi * k[:, np.newaxis])
W_imag = -np.sin(t / frame_len * 2 * np.pi * k[:, np.newaxis])
# interleave parts and multiply by window
#W = np.concatenate((W_real, W_imag), axis=0) # concatenation
W = np.empty((2 * bins, frame_len))
W[::2] = W_real
W[1::2] = W_imag
W = W * win
# define Theano expression for STFT using strided convolution
W = T.constant(W[:, np.newaxis].astype(np.float32), name='W')
try:
conv1d = T.nnet.abstract_conv.AbstractConv(
convdim=1, imshp=(1, 1, None), kshp=(1, 2 * bins, frame_len),
subsample=hopsize)
spect = conv1d(signal.dimshuffle('x', 'x', 0), W)
except Exception:
spect = T.nnet.conv2d(
signal.dimshuffle('x', 'x', 'x', 0),
W.dimshuffle(0, 1, 'x', 2),
input_shape=(1, 1, 1, None),
filter_shape=(1, 2 * bins, 1, frame_len),
subsample=(1, hopsize),
# much slower shape:
#signal.dimshuffle('x', 'x', 0, 'x'),
#W.dimshuffle(0, 1, 2, 'x'),
#input_shape=(1, 1, None, 1),
#filter_shape=(1, 2 * bins, frame_len, 1),
#subsample=(hopsize, 1),
)
# convert into magnitude spectrogram
spect = T.square(spect)
spect = T.sqrt(spect[:, ::2] + spect[:, 1::2])
# compile function
if OUTPUT_ON_GPU:
spect = theano.gpuarray.as_gpuarray_variable(spect, None)
if INPUT_ON_GPU:
return theano.function([], spect)
else:
return theano.function([signal], spect)
def main():
# load input
global x, spectrogram
x = make_test_signal()
spectrogram = compile_spectrogram_fn()
# benchmark
times = timeit.repeat(
setup='from __main__ import x, spectrogram',
stmt='spectrogram(%s)%s' % (
'x' if not INPUT_ON_GPU else '',
'.sync()' if OUTPUT_ON_GPU else ''),
repeat=5, number=32)
print("Took %.3fs." % (min(times) / 32))
# save result
#np.save(sys.argv[0][:-2] + 'npy',
# np.squeeze(spectrogram(x) if not INPUT_ON_GPU else spectrogram()).T)
if __name__=="__main__":
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Computes the spectrogram of a test signal using Theano and cuFFT.
Author: Jan Schlüter
"""
import sys
import os
import timeit
import numpy as np
import theano
import theano.tensor as T
try:
from theano.gpuarray.fft import curfft as rfft
except Exception:
print ("cuFFT not available, will perform FFT on CPU")
from theano.tensor.fft import rfft
from testfile import make_test_signal
INPUT_ON_GPU = True
OUTPUT_ON_GPU = True
def compile_spectrogram_fn(sample_rate=22050, frame_len=1024, fps=70):
"""
Compiles a Theano function for computing a magnitude spectrogram at a given
sample rate (in Hz), frame length (in samples) and frame rate (in Hz).
"""
if INPUT_ON_GPU:
global x
signal = theano.shared(x.astype(np.float32), name='signal')
else:
signal = T.matrix('signal') # already blown up to a list of frames
win = np.hanning(frame_len)
# apply window function
signal = signal * T.constant(win.astype(np.float32))
# define Theano expression for STFT using FFT
spect = rfft(signal)
# convert into magnitude spectrogram
spect = T.square(spect)
spect = T.sqrt(spect[:, :, 0] + spect[:, :, 1])
# compile function
if OUTPUT_ON_GPU:
spect = theano.gpuarray.as_gpuarray_variable(spect, None)
if INPUT_ON_GPU:
return theano.function([], spect)
else:
return theano.function([signal], spect)
def main():
# load input
global x, spectrogram
x = make_test_signal()
# we do the following here because Theano cannot do stride tricks
# the actual copying work is included in the benchmark unless INPUT_ON_GPU
hop_size = 22050 // 70
frame_len = 1024
frames = len(x) - frame_len + 1
x = np.lib.stride_tricks.as_strided(
x, (frames, frame_len), (x.strides[0], x.strides[0]))[::hop_size]
spectrogram = compile_spectrogram_fn()
# benchmark
times = timeit.repeat(
setup='from __main__ import x, spectrogram, np',
stmt='spectrogram(%s)%s' % (
'x' if not INPUT_ON_GPU else '',
'.sync()' if OUTPUT_ON_GPU else ''),
repeat=5, number=32)
print("Took %.3fs." % (min(times) / 32))
# save result
#np.save(sys.argv[0][:-2] + 'npy',
# spectrogram(x) if not INPUT_ON_GPU else spectrogram())
if __name__=="__main__":
main()
MIT License
Copyright (c) 2017 Jan Schlüter
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# -*- coding: utf-8 -*-
"""
Provides a 2-minute test file for STFT experiments.
Author: Jan Schlüter
"""
import numpy as np
def make_test_signal(length=22050 * 120, seed=42):
"""
Creates a test signal of `length` samples, with given random `seed`,
returned as a numpy array of dtype np.int16.
"""
rng = np.random.RandomState(seed)
t = 2 * np.pi * np.arange(length) / 22050.
s = sum(rng.randn() * np.sin(t * (1000 + 10000 * rng.rand() +
10 * np.sin(t * rng.rand())))
for _ in range(10))
s *= (2**15 - 1) / s.max()
return s.astype(np.int16)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment