Last active
June 4, 2017 19:44
-
-
Save dev-zzo/b725f240e2045c922a02c245f59db428 to your computer and use it in GitHub Desktop.
Python vs DSP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A set of simple DSP utilities with abysmal performance | |
""" | |
import math | |
import cmath | |
import struct | |
import util | |
# | |
# Notes: | |
# * http://www.edn.com/Pdf/ViewPdf?contentItemId=4216984 | |
# Resampling with polyphase filters | |
# | |
# | |
# Utility functions | |
# | |
def scaled(seq, sf): | |
return [x * sf for x in seq] | |
def normalized(seq, scale=1.0): | |
s = sum(seq) | |
if s == 0: | |
raise ValueError("cannot normalize a vector with zero sum") | |
return scaled(seq, scale / s) | |
def product(lhs, rhs): | |
if len(lhs) != len(rhs): | |
raise ValueError("vectors must have equal lengths") | |
return [lhs[i] * rhs[i] for i in xrange(0, len(lhs))] | |
def generate_complex_wave(fc, length): | |
w = complex(0, 2 * math.pi * fc) | |
return [cmath.exp(w * i) for i in xrange(0, length)] | |
def read_raw_iq(path, swap=False): | |
with open(path, 'rb') as fp: | |
samples = [] | |
while True: | |
bytes = fp.read(2 * 2) | |
if len(bytes) < 4: | |
break | |
vals = struct.unpack('<hh', bytes) | |
if swap: | |
samples.append(complex(vals[1] / 32767.0, vals[0] / 32767.0)) | |
else: | |
samples.append(complex(vals[0] / 32767.0, vals[1] / 32767.0)) | |
return samples | |
def write_raw_iq(path, samples): | |
with open(path, 'wb') as fp: | |
for s in samples: | |
fp.write(struct.pack('<hh', 32767 * s.real, 32767 * s.imag)) | |
def convolution(signal, kernel): | |
S = [] | |
for n in xrange(0, len(signal) + len(kernel)): | |
s = 0 | |
start_i = max(0, n - len(signal) + 1) | |
end_i = 1 + min(n, len(kernel) - 1) | |
for i in xrange(start_i, end_i): | |
s += signal[n - i] * kernel[i] | |
S.append(s) | |
return S | |
def correlation(signal, kernel): | |
l = len(kernel) | |
s = 0 | |
for n in xrange(l): | |
s += signal[n] * kernel[n] | |
return s | |
def heterodyne_complex_gen(signal, f): | |
"Heterodyne the `signal` with a tone of frequency `f`" | |
i = 0 | |
w0 = cmath.exp(complex(0, 2 * math.pi * f)) | |
w = complex(1) | |
while i < len(signal): | |
yield signal[i] * w | |
w *= w0 | |
i += 1 | |
def heterodyne_complex(signal, f): | |
"Heterodyne the `signal` with a tone of frequency `f`" | |
return list(heterodyne_complex_gen(signal, f)) | |
# | |
# DFT routines (SLOW) | |
# Remember to divide by N after the inverse transform | |
# | |
def dft_complex(input): | |
width = len(input) | |
output = [] | |
w1d = complex(0, -2 * math.pi / width) | |
w1 = 0 | |
for k in xrange(width): | |
X = 0 | |
w2d = cmath.exp(w1) | |
w2 = complex(1, 0) | |
for n in xrange(width): | |
X += input[n] * w2 | |
w2 *= w2d | |
output.append(X) | |
w1 += w1d | |
return output | |
def idft_complex(input): | |
width = len(input) | |
width_inv = 1.0 / width | |
output = [] | |
w1d = complex(0, 2 * math.pi / width) | |
w1 = 0 | |
for n in xrange(width): | |
X = 0 | |
w2d = cmath.exp(w1) | |
w2 = complex(1, 0) | |
for k in xrange(width): | |
X += input[k] * w2 | |
w2 *= w2d | |
output.append(X * width_inv) | |
w1 += w1d | |
return output | |
# | |
# FFT routines | |
# | |
def bitreverse_sort(input): | |
output = list(input) | |
half_length = len(input) // 2 | |
j = half_length | |
for i in xrange(1, len(input) - 1): | |
if i < j: | |
t = output[j] | |
output[j] = output[i] | |
output[i] = t | |
k = half_length | |
while k <= j: | |
j -= k | |
k = k >> 1 | |
j += k | |
return output | |
def log2(x): | |
return math.frexp(x)[1] - 1 | |
def fft_core(x): | |
length = len(x) | |
for l in xrange(1, log2(length) + 1): | |
le = 1 << l | |
le2 = le >> 1 | |
w = 2 * math.pi / le | |
s = cmath.exp(complex(0, -w)) | |
u = complex(1, 0) | |
for j in xrange(1, le2 + 1): | |
for i in xrange(j - 1, length, le): | |
o = i + le2 | |
t = x[o] * u | |
x[o] = x[i] - t | |
x[i] = x[i] + t | |
u *= s | |
def fft_complex(input): | |
"Computes a forward FFT transform for complex-valued input" | |
x = bitreverse_sort(input) | |
fft_core(x) | |
return x | |
def ifft_complex(input): | |
"Computes an inverse FFT transform for complex-valued input" | |
x = bitreverse_sort(input) | |
x = [ v.conjugate() for v in x ] | |
fft_core(x) | |
n_inv = 1.0 / len(x) | |
x = [ v.conjugate() * n_inv for v in x ] | |
return x | |
# | |
# Filters | |
# | |
def generate_hilbert(length): | |
"Generates a Hilbert transform FIR kernel" | |
if (length & 1) == 0: | |
raise ValueError("length must be odd") | |
zf = (length - 1) / 2 | |
return [ (2.0 / math.pi / x) if x & 1 else 0.0 for x in xrange(-zf, zf + 1) ] | |
def generate_hilbert_complex(length): | |
"Generates a complex Hilbert transform FIR kernel" | |
X = [] | |
half_length = length // 2 | |
for i in xrange(length): | |
if i < half_length: | |
X.append(complex(1, 0)) | |
else: | |
X.append(complex(0)) | |
x = idft_complex(X) | |
y = x[half_length + 1:] | |
y[half_length:] = x[0:half_length] | |
return y | |
def generate_sinc(fc, length): | |
"Generates a sinc kernel" | |
h = [] | |
w = 2 * math.pi * fc | |
zf = (length - 1) / 2 | |
for i in xrange(0, length): | |
x = i - zf | |
if x == 0: | |
h.append(w) | |
else: | |
h.append(math.sin(w * x) / x) | |
return h | |
def spectral_inversion(kernel): | |
"Flips the frequency response up-down" | |
new_kernel = [-x for x in kernel] | |
new_kernel[len(kernel) // 2] += 1 | |
return new_kernel | |
def spectral_reversal(kernel): | |
"Flips the frequency response left-right" | |
return [ [-x, x][i & 1] for i in xrange(0, len(kernel)) ] | |
# | |
# Window functions | |
# Ref: https://en.wikipedia.org/wiki/Window_function#A_list_of_window_functions | |
# | |
def generate_cosine_window_1(length, a, b): | |
w = (2 * math.pi) / (length - 1) | |
return [(a - b * math.cos(w * i)) for i in xrange(0, length)] | |
def generate_hamming_window(length): | |
return generate_cosine_window_1(length, 0.54, 0.46) | |
def generate_hann_window(length): | |
return generate_cosine_window_1(length, 0.5, 0.4) | |
def generate_cosine_window_2(length, a, b, c): | |
w = (2 * math.pi) / (length - 1) | |
return [(a - b * math.cos(w * i) + c * math.cos(2 * w * i)) for i in xrange(0, length)] | |
def generate_blackman_window(length): | |
return generate_cosine_window_2(length, 0.42, 0.5, 0.08) | |
def generate_cosine_window_3(length, a, b, c, d): | |
w = (2 * math.pi) / (length - 1) | |
return [(a - b * math.cos(w * i) + c * math.cos(2 * w * i) -d * math.cos(3 * w * i)) for i in xrange(0, length)] | |
def generate_blackman_nuttall_window(length): | |
return generate_cosine_window_3(length, 0.3635819, 0.4891775, 0.1365995, 0.0106411) | |
def generate_blackman_harris_window(length): | |
return generate_cosine_window_3(length, 0.35875, 0.48829, 0.14128, 0.01168) | |
# | |
# | |
def meddle_with_sinc(): | |
fp = open("dsp.csv", "w") | |
l = 4095 | |
my_sinc = generate_sinc(0.025, l) | |
sinc_blackman = normalized(product(my_sinc, generate_blackman_window(l))) | |
XX = [20*math.log10(abs(x)) for x in dft_complex(sinc_blackman)] | |
util.write_as_csv(XX, fp) | |
sinc_blackman_harris = normalized(product(my_sinc, generate_blackman_harris_window(l))) | |
XX = [20*math.log10(abs(x)) for x in dft_complex(sinc_blackman_harris)] | |
util.write_as_csv(XX, fp) | |
def meddle_with_convolution(): | |
s = [1, 4, -2, 5, 3] | |
k = [0, -1, 1, 0] | |
r = convolution(s, k) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment