Skip to content

Instantly share code, notes, and snippets.

@dev-zzo
Last active June 4, 2017 19:44
Show Gist options
  • Save dev-zzo/b725f240e2045c922a02c245f59db428 to your computer and use it in GitHub Desktop.
Save dev-zzo/b725f240e2045c922a02c245f59db428 to your computer and use it in GitHub Desktop.
Python vs DSP
"""
A set of simple DSP utilities with abysmal performance
"""
import math
import cmath
import struct
import util
#
# Notes:
# * http://www.edn.com/Pdf/ViewPdf?contentItemId=4216984
# Resampling with polyphase filters
#
#
# Utility functions
#
def scaled(seq, sf):
return [x * sf for x in seq]
def normalized(seq, scale=1.0):
s = sum(seq)
if s == 0:
raise ValueError("cannot normalize a vector with zero sum")
return scaled(seq, scale / s)
def product(lhs, rhs):
if len(lhs) != len(rhs):
raise ValueError("vectors must have equal lengths")
return [lhs[i] * rhs[i] for i in xrange(0, len(lhs))]
def generate_complex_wave(fc, length):
w = complex(0, 2 * math.pi * fc)
return [cmath.exp(w * i) for i in xrange(0, length)]
def read_raw_iq(path, swap=False):
with open(path, 'rb') as fp:
samples = []
while True:
bytes = fp.read(2 * 2)
if len(bytes) < 4:
break
vals = struct.unpack('<hh', bytes)
if swap:
samples.append(complex(vals[1] / 32767.0, vals[0] / 32767.0))
else:
samples.append(complex(vals[0] / 32767.0, vals[1] / 32767.0))
return samples
def write_raw_iq(path, samples):
with open(path, 'wb') as fp:
for s in samples:
fp.write(struct.pack('<hh', 32767 * s.real, 32767 * s.imag))
def convolution(signal, kernel):
S = []
for n in xrange(0, len(signal) + len(kernel)):
s = 0
start_i = max(0, n - len(signal) + 1)
end_i = 1 + min(n, len(kernel) - 1)
for i in xrange(start_i, end_i):
s += signal[n - i] * kernel[i]
S.append(s)
return S
def correlation(signal, kernel):
l = len(kernel)
s = 0
for n in xrange(l):
s += signal[n] * kernel[n]
return s
def heterodyne_complex_gen(signal, f):
"Heterodyne the `signal` with a tone of frequency `f`"
i = 0
w0 = cmath.exp(complex(0, 2 * math.pi * f))
w = complex(1)
while i < len(signal):
yield signal[i] * w
w *= w0
i += 1
def heterodyne_complex(signal, f):
"Heterodyne the `signal` with a tone of frequency `f`"
return list(heterodyne_complex_gen(signal, f))
#
# DFT routines (SLOW)
# Remember to divide by N after the inverse transform
#
def dft_complex(input):
width = len(input)
output = []
w1d = complex(0, -2 * math.pi / width)
w1 = 0
for k in xrange(width):
X = 0
w2d = cmath.exp(w1)
w2 = complex(1, 0)
for n in xrange(width):
X += input[n] * w2
w2 *= w2d
output.append(X)
w1 += w1d
return output
def idft_complex(input):
width = len(input)
width_inv = 1.0 / width
output = []
w1d = complex(0, 2 * math.pi / width)
w1 = 0
for n in xrange(width):
X = 0
w2d = cmath.exp(w1)
w2 = complex(1, 0)
for k in xrange(width):
X += input[k] * w2
w2 *= w2d
output.append(X * width_inv)
w1 += w1d
return output
#
# FFT routines
#
def bitreverse_sort(input):
output = list(input)
half_length = len(input) // 2
j = half_length
for i in xrange(1, len(input) - 1):
if i < j:
t = output[j]
output[j] = output[i]
output[i] = t
k = half_length
while k <= j:
j -= k
k = k >> 1
j += k
return output
def log2(x):
return math.frexp(x)[1] - 1
def fft_core(x):
length = len(x)
for l in xrange(1, log2(length) + 1):
le = 1 << l
le2 = le >> 1
w = 2 * math.pi / le
s = cmath.exp(complex(0, -w))
u = complex(1, 0)
for j in xrange(1, le2 + 1):
for i in xrange(j - 1, length, le):
o = i + le2
t = x[o] * u
x[o] = x[i] - t
x[i] = x[i] + t
u *= s
def fft_complex(input):
"Computes a forward FFT transform for complex-valued input"
x = bitreverse_sort(input)
fft_core(x)
return x
def ifft_complex(input):
"Computes an inverse FFT transform for complex-valued input"
x = bitreverse_sort(input)
x = [ v.conjugate() for v in x ]
fft_core(x)
n_inv = 1.0 / len(x)
x = [ v.conjugate() * n_inv for v in x ]
return x
#
# Filters
#
def generate_hilbert(length):
"Generates a Hilbert transform FIR kernel"
if (length & 1) == 0:
raise ValueError("length must be odd")
zf = (length - 1) / 2
return [ (2.0 / math.pi / x) if x & 1 else 0.0 for x in xrange(-zf, zf + 1) ]
def generate_hilbert_complex(length):
"Generates a complex Hilbert transform FIR kernel"
X = []
half_length = length // 2
for i in xrange(length):
if i < half_length:
X.append(complex(1, 0))
else:
X.append(complex(0))
x = idft_complex(X)
y = x[half_length + 1:]
y[half_length:] = x[0:half_length]
return y
def generate_sinc(fc, length):
"Generates a sinc kernel"
h = []
w = 2 * math.pi * fc
zf = (length - 1) / 2
for i in xrange(0, length):
x = i - zf
if x == 0:
h.append(w)
else:
h.append(math.sin(w * x) / x)
return h
def spectral_inversion(kernel):
"Flips the frequency response up-down"
new_kernel = [-x for x in kernel]
new_kernel[len(kernel) // 2] += 1
return new_kernel
def spectral_reversal(kernel):
"Flips the frequency response left-right"
return [ [-x, x][i & 1] for i in xrange(0, len(kernel)) ]
#
# Window functions
# Ref: https://en.wikipedia.org/wiki/Window_function#A_list_of_window_functions
#
def generate_cosine_window_1(length, a, b):
w = (2 * math.pi) / (length - 1)
return [(a - b * math.cos(w * i)) for i in xrange(0, length)]
def generate_hamming_window(length):
return generate_cosine_window_1(length, 0.54, 0.46)
def generate_hann_window(length):
return generate_cosine_window_1(length, 0.5, 0.4)
def generate_cosine_window_2(length, a, b, c):
w = (2 * math.pi) / (length - 1)
return [(a - b * math.cos(w * i) + c * math.cos(2 * w * i)) for i in xrange(0, length)]
def generate_blackman_window(length):
return generate_cosine_window_2(length, 0.42, 0.5, 0.08)
def generate_cosine_window_3(length, a, b, c, d):
w = (2 * math.pi) / (length - 1)
return [(a - b * math.cos(w * i) + c * math.cos(2 * w * i) -d * math.cos(3 * w * i)) for i in xrange(0, length)]
def generate_blackman_nuttall_window(length):
return generate_cosine_window_3(length, 0.3635819, 0.4891775, 0.1365995, 0.0106411)
def generate_blackman_harris_window(length):
return generate_cosine_window_3(length, 0.35875, 0.48829, 0.14128, 0.01168)
#
#
def meddle_with_sinc():
fp = open("dsp.csv", "w")
l = 4095
my_sinc = generate_sinc(0.025, l)
sinc_blackman = normalized(product(my_sinc, generate_blackman_window(l)))
XX = [20*math.log10(abs(x)) for x in dft_complex(sinc_blackman)]
util.write_as_csv(XX, fp)
sinc_blackman_harris = normalized(product(my_sinc, generate_blackman_harris_window(l)))
XX = [20*math.log10(abs(x)) for x in dft_complex(sinc_blackman_harris)]
util.write_as_csv(XX, fp)
def meddle_with_convolution():
s = [1, 4, -2, 5, 3]
k = [0, -1, 1, 0]
r = convolution(s, k)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment