Created
February 15, 2024 23:39
-
-
Save prydin/507f58d31617dbac869ac0df70da9252 to your computer and use it in GitHub Desktop.
AM demodulation pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Baseband sample rate | |
import math | |
import matplotlib.pyplot as plt | |
import numpy | |
from scipy import signal, fft | |
import numpy as np | |
# RF sampling rate | |
rf_rate = 6e6 | |
# Carrier frequency | |
carrier = 2e6 | |
# IF sample rate | |
if_rate = 2e5 | |
# Length of simulation run (must be power of 2) | |
n_samples = 65536 | |
# Post filter downsample ratio | |
downsample_ratio = 256 | |
class DCBlocker: | |
offset = 0 | |
rate = 0.01 | |
def process(self, x): | |
y = x - self.offset | |
self.offset += y * self.rate | |
return y | |
dc_blocker = DCBlocker() | |
class Filter: | |
p = 0 | |
buffer = [] | |
taps = [] | |
def __init__(self, taps): | |
self.taps = taps | |
self.buffer = [0] * len(self.taps) | |
def process(self, x): | |
self.buffer[self.p] = x | |
self.p -= 1 | |
if self.p < 0: | |
self.p = len(self.buffer) - 1 | |
# Convolve with taps. Buffer grows towards lower index | |
# so convolution should iterate toward higher index | |
k = self.p | |
acc = 0 | |
for i in range(len(self.buffer)): | |
acc += self.buffer[k] * self.taps[i] | |
k += 1 | |
if k >= len(self.buffer): | |
k = 0 | |
return acc | |
# 5kHz bandwidth filter | |
taps_5k = signal.firwin(100, 2000, fs=rf_rate, pass_zero="lowpass", window="hann") | |
filter_5k_i = Filter(taps_5k) | |
filter_5k_q = Filter(taps_5k) | |
class Filter: | |
taps = [] | |
buffer = [] | |
p = 0 | |
def __init__(self, taps): | |
self.taps = taps[len(taps)/2:] | |
self.buffer = [0] * len(self.taps) | |
def process(self, x): | |
self.buffer[self.p] = x | |
self.p -= 1 | |
if self.p < 0: | |
self.p = len(self.buffer) - 1 | |
# Convolve with taps. Buffer grows towards lower index | |
# so convolution should iterate toward higher index | |
k = self.p | |
acc = 0 | |
for i in range(len(p)): | |
acc += self.buffer[k] * self.taps[i] | |
k += 1 | |
if k >= self.len(self.buffer): | |
k = 0 | |
return acc | |
def osc(t, rate, f, phase): | |
return math.cos((t * f / rate) * 2 * math.pi + phase) | |
def gen_am(t, rate, f, siggen, weight): | |
return osc(t, rate, f, 0) * (siggen(t, rate) * weight + (1 - weight)) | |
def tone_1khz(t, rate): | |
return osc(t, rate, 1000, 0) | |
def two_tones(t, rate): | |
return (osc(t, rate, 1000, 0) + osc(t, rate, 1500, math.pi / 4)) / 2 | |
def tone_10khz(t, rate): | |
return osc(t, rate, 100000, 0) | |
def downshift(t, x, rate, f_baseband, f_i): | |
return x * osc(t, rate, f_baseband - f_i, 0) | |
def downsample(t, x, base_rate, new_rate, handler): | |
i = t * base_rate | |
if i % new_rate == 0: | |
handler(t, x, new_rate) | |
def to_quad(t, x, rate, f): | |
return x * osc(t, rate, f, 0), x * osc(t, rate, f, math.pi / 2) | |
def pipeline(t, rate): | |
# Simulate two AM stations transmitting with 50000kHz separation | |
x = gen_am(t, rate, carrier, two_tones, 0.5) + gen_am(t, carrier + 50000, rate, tone_1khz, 0.3) | |
# Downshift to baseband | |
i, q = to_quad(t, x, rate, carrier) | |
# Filter to desired bandwidth | |
i = filter_5k_i.process(i) | |
q = filter_5k_q.process(q) | |
# Combine I and Q (TODO: Is addition the right operation?) | |
x = i + q | |
# Remove DC | |
x = dc_blocker.process(x) | |
return x | |
def output_generator(rate, n_samples, downsample_ratio): | |
# EXTREMELY naive downsample algorithm. Just pick the sample | |
# that happens to be ad an even multiple of the downsample_ratio | |
t = 0 | |
n = 0 | |
for i in range(n_samples): | |
y = pipeline(t, rate) | |
t += 1 | |
n += 1 | |
if n >= downsample_ratio: | |
n = 0 | |
yield y | |
def run(): | |
audio_rate = rf_rate / downsample_ratio | |
v = list(output_generator(rf_rate, n_samples, downsample_ratio)) | |
w = signal.windows.hann(len(v)) | |
y = fft.fft(v*w, norm="forward") | |
ax = plt.subplot() | |
xaxis_gen = (i * audio_rate / len(v) for i in range(int(len(v) / 2))) | |
# ax.plot(list(xaxis_gen), 10*np.log(np.absolute(y[0:int(len(v)/2)]))) | |
ax.plot(range(len(v)), v) | |
plt.show() | |
# Main entry point | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment